Skip to main content

slim_datapath/
message_processing.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{HashMap, HashSet};
5use std::net::SocketAddr;
6use std::{pin::Pin, sync::Arc};
7
8use crate::api::DataPlaneServiceServer;
9use display_error_chain::ErrorChainExt;
10use parking_lot::RwLock;
11use slim_config::component::configuration::Configuration;
12use slim_config::grpc::client::ClientConfig;
13use slim_config::grpc::server::ServerConfig;
14use tokio::sync::mpsc::{self, Sender};
15use tokio::task::JoinHandle;
16use tokio_stream::wrappers::ReceiverStream;
17use tokio_stream::{Stream, StreamExt};
18use tokio_util::sync::CancellationToken;
19
20use tonic::{Request, Response, Status};
21use tracing::{Instrument, debug, error, info};
22
23#[cfg(feature = "otel_tracing")]
24use crate::otel_tracing;
25
26use crate::api::ProtoMessage;
27use crate::api::ProtoPublishType as PublishType;
28use crate::api::ProtoSubscribeType as SubscribeType;
29use crate::api::ProtoSubscriptionAckType as SubscriptionAckType;
30use crate::api::ProtoUnsubscribeType as UnsubscribeType;
31use crate::api::proto::dataplane::v1::Message;
32use crate::api::{
33    LinkNegotiationPayload, ProtoLink, ProtoLinkMessageType as LinkType, ProtoLinkType,
34};
35use semver;
36
37use crate::api::proto::dataplane::v1::data_plane_service_client::DataPlaneServiceClient;
38use crate::api::proto::dataplane::v1::data_plane_service_server::DataPlaneService;
39use crate::connection::{Channel, Connection, Type as ConnectionType};
40use crate::errors::{DataPathError, MessageContext};
41use crate::forwarder::Forwarder;
42use crate::messages::Name;
43use crate::messages::utils::SlimHeaderFlags;
44use crate::recovery::RecoveryTable;
45use crate::tables::connection_table::ConnectionTable;
46use crate::tables::remote_subscription_table::SubscriptionInfo;
47use crate::tables::subscription_table::SubscriptionTableImpl;
48
49fn local_version() -> &'static str {
50    slim_version::version()
51}
52
53#[derive(Debug)]
54struct MessageProcessorInternal {
55    /// The forwarder to handle processing events
56    forwarder: Forwarder<Connection>,
57
58    /// Drain signal to gracefully close all pending tasks
59    drain_signal: parking_lot::RwLock<Option<drain::Signal>>,
60
61    ///Drain watch to receive drain signal
62    drain_watch: parking_lot::RwLock<Option<drain::Watch>>,
63
64    /// Tx channel towards control plane
65    tx_control_plane: RwLock<Option<Sender<Result<Message, Status>>>>,
66
67    /// Pending route-recovery state for server-side connections (see [`RecoveryTable`]).
68    recovery_table: RecoveryTable,
69
70    /// Remote subscription ACK manager
71    sub_ack_manager: crate::subscription_ack::RemoteSubAckManager,
72
73    /// Service ID for tracing
74    service_id: String,
75}
76
77#[derive(Debug, Clone)]
78pub struct MessageProcessor {
79    internal: Arc<MessageProcessorInternal>,
80}
81
82impl Default for MessageProcessor {
83    fn default() -> Self {
84        Self::new_with_service_id(String::new())
85    }
86}
87
88impl MessageProcessor {
89    pub fn new_with_service_id(service_id: String) -> Self {
90        Self::new_with_options(service_id, None)
91    }
92
93    pub fn new_with_options(service_id: String, recovery_ttl: Option<std::time::Duration>) -> Self {
94        let (signal, watch) = drain::channel();
95        let recovery_table = match recovery_ttl {
96            Some(ttl) => RecoveryTable::new(ttl),
97            None => RecoveryTable::default(),
98        };
99        let internal = MessageProcessorInternal {
100            forwarder: Forwarder::new(),
101            drain_signal: RwLock::new(Some(signal)),
102            drain_watch: RwLock::new(Some(watch)),
103            tx_control_plane: RwLock::new(None),
104            recovery_table,
105            sub_ack_manager: crate::subscription_ack::RemoteSubAckManager::new(),
106            service_id,
107        };
108        Self {
109            internal: Arc::new(internal),
110        }
111    }
112
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    /// Run a data plane gRPC server using this message processor's drain watch.
118    /// Returns a cancellation token that can be used to stop the server task.
119    pub async fn run_server(
120        &self,
121        config: &ServerConfig,
122    ) -> Result<CancellationToken, DataPathError> {
123        debug!(%config, "starting dataplane server");
124        let watch = self.get_drain_watch()?;
125        // Wrap self in an Arc since the server builder expects an Arc<MessageProcessor>
126        let svc = Arc::new(self.clone());
127        let res = config
128            .run_server(&[DataPlaneServiceServer::from_arc(svc)], watch)
129            .await?;
130
131        Ok(res)
132    }
133
134    pub async fn shutdown(&self) -> Result<(), DataPathError> {
135        // Take the drain signal
136        let signal = self
137            .internal
138            .drain_signal
139            .write()
140            .take()
141            .ok_or(DataPathError::AlreadyClosedError)?;
142
143        // Take drain watch
144        self.internal.drain_watch.write().take();
145
146        // Signal completion to all tasks
147        signal.drain().await;
148
149        Ok(())
150    }
151
152    fn set_tx_control_plane(&self, tx: Sender<Result<Message, Status>>) {
153        let mut tx_guard = self.internal.tx_control_plane.write();
154        *tx_guard = Some(tx);
155    }
156
157    fn get_tx_control_plane(&self) -> Option<Sender<Result<Message, Status>>> {
158        let tx_guard = self.internal.tx_control_plane.read();
159        tx_guard.clone()
160    }
161
162    fn forwarder(&self) -> &Forwarder<Connection> {
163        &self.internal.forwarder
164    }
165
166    pub(crate) fn remove_sub_ack(&self, subscription_id: u64) {
167        self.internal.sub_ack_manager.remove(subscription_id);
168    }
169
170    fn get_drain_watch(&self) -> Result<drain::Watch, DataPathError> {
171        self.internal
172            .drain_watch
173            .read()
174            .clone()
175            .ok_or(DataPathError::AlreadyClosedError)
176    }
177
178    /// Re-send `remote_subs` as subscribe messages to `conn_index`.
179    ///
180    /// When `restore_tracking` is `true` (server-side recovery), also re-registers each
181    /// subscription in the local forwarded-subscription table.  This is necessary because
182    /// [`Forwarder::on_connection_drop`] already wiped that state.
183    ///
184    /// When `restore_tracking` is `false` (client-side reconnect), the forwarded-subscription
185    /// table was never cleaned up (reconnect reuses the same slot), so no re-registration is
186    /// needed and double-counting must be avoided.
187    async fn restore_remote_subscriptions(
188        &self,
189        remote_subs: &HashSet<SubscriptionInfo>,
190        conn_index: u64,
191        restore_tracking: bool,
192    ) {
193        for r in remote_subs {
194            let sub_msg = Message::builder()
195                .source(r.source().clone())
196                .destination(r.name().clone())
197                .identity(r.source_identity())
198                .build_subscribe()
199                .unwrap();
200            if let Err(e) = self.send_msg(sub_msg, conn_index).await {
201                error!(
202                    error = %e.chain(), %conn_index,
203                    "error restoring subscription on remote node",
204                );
205            } else if restore_tracking {
206                self.forwarder().on_forwarded_subscription(
207                    r.source().clone(),
208                    r.name().clone(),
209                    r.source_identity().clone(),
210                    conn_index,
211                    true,
212                    r.subscription_id(),
213                );
214            }
215        }
216    }
217
218    async fn try_to_connect(
219        &self,
220        client_config: ClientConfig,
221        local: Option<SocketAddr>,
222        remote: Option<SocketAddr>,
223        existing_conn_index: Option<u64>,
224    ) -> Result<(JoinHandle<()>, u64), DataPathError> {
225        client_config.validate()?;
226        let mut watch = std::pin::pin!(self.get_drain_watch()?.signaled());
227
228        let channel = tokio::select! {
229            _ = &mut watch => {
230                return Err(DataPathError::ShuttingDownError);
231            }
232            res = client_config.to_channel() => {
233                res?
234            }
235        };
236
237        let mut client = DataPlaneServiceClient::new(channel);
238        let (tx, rx) = mpsc::channel(128);
239
240        let stream = client
241            .open_channel(Request::new(ReceiverStream::new(rx)))
242            .await?;
243
244        // The link_id is embedded in the Connection before it enters the table so that
245        // it is never None during the connection's lifetime — including reconnects where
246        // the old Connection object is replaced by a fresh one.
247        let link_id = client_config.link_id.clone();
248
249        let cancellation_token = CancellationToken::new();
250        let connection = Connection::new(ConnectionType::Remote, Channel::Client(tx))
251            .with_local_addr(local)
252            .with_remote_addr(remote)
253            .with_config_data(Some(client_config.clone()))
254            .with_cancellation_token(Some(cancellation_token.clone()))
255            .with_link_id(link_id.clone());
256
257        debug!(
258            remote = ?connection.remote_addr(),
259            local = ?connection.local_addr(),
260            "new connection initiated locally",
261        );
262
263        // insert connection into connection table
264        let conn_index = self
265            .forwarder()
266            .on_connection_established(connection, existing_conn_index)
267            .ok_or(DataPathError::ConnectionTableAddError)?;
268
269        debug!(
270            %conn_index,
271            is_local = false,
272            "new connection index",
273        );
274
275        // Start loop to process messages
276        let handle = self.process_stream(
277            stream.into_inner(),
278            conn_index,
279            Some(client_config.clone()),
280            cancellation_token,
281            false,
282            false,
283        )?;
284
285        // Send the link negotiation message to the remote peer.
286        // Old SLIM instances that do not understand this message will silently drop it.
287        let negotiation_msg =
288            ProtoMessage::builder().build_link_negotiation(&link_id, local_version(), false);
289        if let Err(e) = self.send_msg(negotiation_msg, conn_index).await {
290            debug!(
291                %conn_index,
292                error = %e.chain(),
293                "failed to send link negotiation (remote may be an older SLIM instance)",
294            );
295        }
296
297        Ok((handle, conn_index))
298    }
299
300    pub async fn connect(
301        &self,
302        client_config: ClientConfig,
303        local: Option<SocketAddr>,
304        remote: Option<SocketAddr>,
305    ) -> Result<(JoinHandle<()>, u64), DataPathError> {
306        self.try_to_connect(client_config, local, remote, None)
307            .await
308    }
309
310    pub fn disconnect(&self, conn: u64) -> Result<ClientConfig, DataPathError> {
311        let connection = match self.forwarder().get_connection(conn) {
312            Some(c) => c,
313            None => {
314                error!(%conn, "error handling disconnect: connection unknown");
315                return Err(DataPathError::DisconnectionError(conn));
316            }
317        };
318
319        let token = match connection.cancellation_token() {
320            Some(t) => t,
321            None => {
322                error!(%conn, "error handling disconnect: missing cancellation token");
323                return Err(DataPathError::DisconnectionError(conn));
324            }
325        };
326
327        // Cancel receiving loop; this triggers deletion of connection state.
328        token.cancel();
329
330        connection
331            .config_data()
332            .cloned()
333            .ok_or(DataPathError::DisconnectionError(conn))
334    }
335
336    #[tracing::instrument(skip_all, fields(service_id = %self.internal.service_id))]
337    pub fn register_local_connection(
338        &self,
339        from_control_plane: bool,
340    ) -> Result<
341        (
342            u64,
343            tokio::sync::mpsc::Sender<Result<Message, Status>>,
344            tokio::sync::mpsc::Receiver<Result<Message, Status>>,
345        ),
346        DataPathError,
347    > {
348        // create a pair tx, rx to be able to send messages with the standard processing loop
349        let (tx1, rx1) = mpsc::channel(512);
350
351        debug!("establishing new local app connection");
352
353        // create a pair tx, rx to be able to receive messages and insert it into the connection table
354        let (tx2, rx2) = mpsc::channel(512);
355
356        // if the call is coming from the control plane set the tx channel
357        // we assume to talk to a single control plane so set the channel only once
358        if from_control_plane && self.get_tx_control_plane().is_none() {
359            self.set_tx_control_plane(tx2.clone());
360        }
361
362        // create a connection
363        let cancellation_token = CancellationToken::new();
364        let connection = Connection::new(ConnectionType::Local, Channel::Server(tx2))
365            .with_cancellation_token(Some(cancellation_token.clone()));
366
367        // add it to the connection table
368        let conn_id = self
369            .forwarder()
370            .on_connection_established(connection, None)
371            .unwrap();
372
373        debug!(%conn_id, "local connection established");
374        info!(telemetry = true, counter.num_active_connections = 1);
375
376        // this loop will process messages from the local app
377        self.process_stream(
378            ReceiverStream::new(rx1),
379            conn_id,
380            None,
381            cancellation_token,
382            true,
383            from_control_plane,
384        )?;
385
386        // return the conn_id and  handles to be used to send and receive messages
387        Ok((conn_id, tx1, rx2))
388    }
389
390    pub async fn send_msg(
391        &self,
392        #[cfg(feature = "otel_tracing")] mut msg: Message,
393        #[cfg(not(feature = "otel_tracing"))] msg: Message,
394        out_conn: u64,
395    ) -> Result<(), DataPathError> {
396        #[cfg(feature = "otel_tracing")]
397        otel_tracing::prepare_outbound_msg(
398            &mut msg,
399            "send_message",
400            &self.internal.service_id,
401            otel_tracing::SpanTarget::Connection(out_conn),
402        );
403        self.send_msg_raw(msg, out_conn).await
404    }
405
406    async fn send_msg_raw(&self, mut msg: Message, out_conn: u64) -> Result<(), DataPathError> {
407        let connection = self.forwarder().get_connection(out_conn);
408        match connection {
409            Some(conn) => {
410                msg.clear_slim_header();
411                match conn.channel() {
412                    Channel::Server(s) => {
413                        s.send(Ok(msg))
414                            .await
415                            .map_err(|e| DataPathError::MessageProcessingError {
416                                source: Box::new(DataPathError::ConnectionNotFound(out_conn)),
417                                msg: Box::new(e.0.unwrap_or_default()),
418                            })
419                    }
420                    Channel::Client(s) => {
421                        s.send(msg)
422                            .await
423                            .map_err(|e| DataPathError::MessageProcessingError {
424                                source: Box::new(DataPathError::ConnectionNotFound(out_conn)),
425                                msg: Box::new(e.0),
426                            })
427                    }
428                }
429            }
430            None => Err(DataPathError::ConnectionNotFound(out_conn)),
431        }
432    }
433
434    async fn match_and_forward_msg(
435        &self,
436        #[cfg(feature = "otel_tracing")] mut msg: Message,
437        #[cfg(not(feature = "otel_tracing"))] msg: Message,
438        name: Name,
439        in_connection: u64,
440        fanout: u32,
441    ) -> Result<(), DataPathError> {
442        debug!(
443            %name,
444            %fanout,
445            "match and forward message"
446        );
447
448        // if the message already contains an output connection, use that one
449        // without performing any match in the subscription table
450        if let Some(val) = msg.get_forward_to() {
451            debug!(conn = %val, "forwarding message to connection");
452            return self.send_msg(msg, val).await;
453        }
454
455        match self
456            .forwarder()
457            .on_publish_msg_match(name, in_connection, fanout)
458        {
459            Ok(out_vec) => {
460                let len = out_vec.len();
461                // Single destination: preserve per-connection span attributes.
462                if len == 1 {
463                    return self.send_msg(msg, out_vec[0]).await;
464                }
465
466                #[cfg(feature = "otel_tracing")]
467                otel_tracing::prepare_fanout_msg(
468                    &mut msg,
469                    "send_message",
470                    &self.internal.service_id,
471                    len as u32,
472                );
473
474                let mut i = 0usize;
475                while i < len - 1 {
476                    self.send_msg_raw(msg.clone(), out_vec[i]).await?;
477                    i += 1;
478                }
479                self.send_msg_raw(msg, out_vec[i]).await?;
480                Ok(())
481            }
482            Err(e) => Err(DataPathError::MessageProcessingError {
483                source: Box::new(e),
484                msg: Box::new(msg),
485            }),
486        }
487    }
488
489    /// Dispatch an inbound Link message to the appropriate handler.
490    ///
491    /// Link messages are link-local and must never be processed for local connections
492    /// (they are only exchanged between SLIM nodes).
493    async fn handle_link_message(
494        &self,
495        link: ProtoLink,
496        conn_index: u64,
497        is_local: bool,
498    ) -> Result<(), DataPathError> {
499        if is_local {
500            debug!(%conn_index, "ignoring link message received on local connection");
501            return Ok(());
502        }
503        match link.link_type {
504            Some(ProtoLinkType::LinkNegotiation(payload)) => {
505                self.handle_link_negotiation(&payload, conn_index).await
506            }
507            None => {
508                debug!(%conn_index, "received link message with unset link_type");
509                Ok(())
510            }
511        }
512    }
513
514    /// Handle an inbound link negotiation message.
515    ///
516    /// On request (`is_reply == false`): validate the client-provided `link_id` as UUID v4,
517    /// atomically store both fields under one lock, then echo back a reply.
518    ///
519    /// On reply (`is_reply == true`): verify the echoed `link_id` matches what we sent, then
520    /// atomically store the remote version.  No further reply is sent, preventing echo loops.
521    ///
522    /// Both methods hold a single write lock for validation and mutation, eliminating TOCTOU races.
523    async fn handle_link_negotiation(
524        &self,
525        payload: &LinkNegotiationPayload,
526        in_connection: u64,
527    ) -> Result<(), DataPathError> {
528        let link_id = &payload.link_id;
529        let remote_version = &payload.slim_version;
530
531        debug!(
532            %in_connection,
533            %link_id,
534            %remote_version,
535            is_reply = payload.is_reply,
536            "received link negotiation",
537        );
538
539        let Some(conn) = self.forwarder().get_connection(in_connection) else {
540            debug!(%in_connection, "ignoring link negotiation request received on unknown connection");
541            return Ok(());
542        };
543
544        // Role check: clients must only receive replies; servers must only receive requests.
545        match (conn.is_outgoing(), payload.is_reply) {
546            (true, false) => {
547                debug!(%in_connection, "ignoring link negotiation request received on outgoing connection");
548                return Ok(());
549            }
550            (false, true) => {
551                debug!(%in_connection, "ignoring link negotiation reply received on incoming connection");
552                return Ok(());
553            }
554            _ => {}
555        }
556
557        // Parse the remote version before any state mutation.
558        let version = match semver::Version::parse(remote_version) {
559            Ok(v) => v,
560            Err(e) => {
561                debug!(%in_connection, %remote_version, error = %e, "ignoring link negotiation with unparsable remote SLIM version");
562                return Ok(());
563            }
564        };
565
566        if payload.is_reply {
567            // Client path: verifies the echoed link_id matches what we sent and stores the remote
568            // version atomically (replay-protected).
569            if !conn.complete_negotiation_as_client(link_id, version) {
570                debug!(%in_connection, %link_id, "ignoring link negotiation reply");
571            }
572        } else {
573            // Server path: validates link_id as UUID v4, stores it together with the remote
574            // version atomically (replay-protected), then echoes a reply.
575            if !conn.complete_negotiation_as_server(link_id, version) {
576                debug!(%in_connection, %link_id, "ignoring link negotiation request");
577                return Ok(());
578            }
579
580            // Route recovery: if the peer reconnected with a known link_id, restore all
581            // routing state that was preserved during the recovery window.
582            if let Some(entry) = self.internal.recovery_table.take(link_id) {
583                info!(%in_connection, %link_id, "recovering routes for reconnected peer");
584
585                // Re-add local routing entries.  A new conn_index was allocated for this
586                // connection, so we must re-register each name under the current index,
587                // preserving the original subscription IDs so UNSUBSCRIBE messages work.
588                for (name, sub_ids) in &entry.local_subs {
589                    for &subscription_id in sub_ids {
590                        if let Err(e) = self.forwarder().on_subscription_msg(
591                            name.clone(),
592                            in_connection,
593                            false,
594                            true,
595                            subscription_id,
596                        ) {
597                            error!(
598                                error = %e.chain(), %in_connection,
599                                "error re-adding local subscription during recovery",
600                            );
601                        }
602                    }
603                }
604
605                // Re-send subscriptions to the remote peer and re-register tracking.
606                // restore_tracking = true: on_connection_drop already wiped the
607                // forwarded-subscription table, so we must rebuild it here.
608                self.restore_remote_subscriptions(&entry.remote_subs, in_connection, true)
609                    .await;
610            }
611
612            // Send reply only after state is committed.
613            let reply =
614                ProtoMessage::builder().build_link_negotiation(link_id, local_version(), true);
615            if let Err(e) = self.send_msg(reply, in_connection).await {
616                debug!(
617                    %in_connection,
618                    error = %e.chain(),
619                    "failed to send link negotiation reply",
620                );
621            }
622        }
623
624        Ok(())
625    }
626
627    async fn process_publish(&self, msg: Message, in_connection: u64) -> Result<(), DataPathError> {
628        debug!(
629            %in_connection,
630            ?msg,
631            "received publication"
632        );
633
634        // telemetry /////////////////////////////////////////
635        info!(
636            telemetry = true,
637            monotonic_counter.num_messages_by_type = 1,
638            method = "publish"
639        );
640        //////////////////////////////////////////////////////
641
642        // get header
643        let header = msg.get_slim_header();
644
645        let dst = header.get_dst();
646
647        // this function may panic, but at this point we are sure we are processing
648        // a publish message
649        let fanout = msg.get_fanout();
650
651        self.match_and_forward_msg(msg, dst, in_connection, fanout)
652            .await
653    }
654
655    pub(crate) async fn send_subscription_ack(
656        &self,
657        in_connection: u64,
658        subscription_id: u64,
659        result: &Result<(), DataPathError>,
660    ) {
661        let (success, error_msg) = match result {
662            Ok(()) => (true, String::new()),
663            Err(e) => (false, e.to_string()),
664        };
665
666        let ack_msg =
667            Message::builder().build_subscription_ack(subscription_id, success, error_msg);
668
669        if let Err(e) = self.send_msg(ack_msg, in_connection).await {
670            error!(error = %e.chain(), "failed to send subscription ack");
671        }
672    }
673
674    async fn process_subscription_update_and_forward(
675        &self,
676        msg: Message,
677        conn: u64,
678        forward: Option<u64>,
679        add: bool,
680        subscription_id: u64,
681    ) -> Result<(), DataPathError> {
682        let dst = msg.get_dst();
683
684        // As connection is deleted only after processing, at this point it must exist.
685        let connection = if let Some(c) = self.forwarder().get_connection(conn) {
686            c
687        } else {
688            return Err(DataPathError::MessageProcessingError {
689                source: Box::new(DataPathError::ConnectionNotFound(conn)),
690                msg: Box::new(msg),
691            });
692        };
693
694        debug!(
695            %conn,
696            %dst,
697            is_local = connection.is_local_connection(),
698            "processing {}subscription",
699            if add { "" } else { "un" }
700        );
701
702        self.forwarder().on_subscription_msg(
703            dst.clone(),
704            conn,
705            connection.is_local_connection(),
706            add,
707            subscription_id,
708        )?;
709
710        match forward {
711            None => Ok(()),
712            Some(out_conn) => {
713                debug!(
714                    %out_conn,
715                    "forwarding {}subscription to connection",
716                    if add { "" } else { "un" }
717                );
718
719                let source = msg.get_source();
720                let identity = msg.get_identity();
721
722                self.send_msg(msg, out_conn).await.map(|_| {
723                    self.forwarder().on_forwarded_subscription(
724                        source,
725                        dst,
726                        identity,
727                        out_conn,
728                        add,
729                        subscription_id,
730                    );
731                })
732            }
733        }
734    }
735
736    // Use a single function to process subscription and unsubscription packets.
737    // The flag add = true is used to add a new subscription while add = false
738    // is used to remove existing state
739    async fn process_subscription(
740        &self,
741        msg: Message,
742        in_connection: u64,
743        add: bool,
744    ) -> Result<(), DataPathError> {
745        debug!(
746            %in_connection,
747            ?msg,
748            "received {}subscription",
749            if add { "" } else { "un" }
750        );
751
752        // telemetry /////////////////////////////////////////
753        info!(
754            telemetry = true,
755            monotonic_counter.num_messages_by_type = 1,
756            message_type = { if add { "subscribe" } else { "unsubscribe" } }
757        );
758        //////////////////////////////////////////////////////
759
760        let subscription_id = msg.get_subscription_id();
761
762        debug!(?subscription_id, "received subscription id");
763
764        // get header
765        let header = msg.get_slim_header();
766
767        // get in and out connections
768        let (in_conn, recv_from, forward) = header.get_connections();
769        let in_conn = recv_from.unwrap_or(in_conn);
770
771        // Never forward subscriptions to local connections (they are local apps whose
772        // routes are already set locally).
773        let forward = forward.filter(|&out| {
774            self.forwarder()
775                .get_connection(out)
776                .map(|c| !c.is_local_connection())
777                .unwrap_or(true)
778        });
779
780        // If forwarding to a remote ACK-capable node (v≥1.2.0), use the remote ack path:
781        // update local state now, then asynchronously forward and wait for the remote ACK
782        // before notifying the upstream requester.
783        let use_remote_ack = forward
784            .and_then(|out| self.forwarder().get_connection(out))
785            .map(|c| crate::subscription_ack::supports(&c))
786            .unwrap_or(false);
787
788        if forward.is_some() && !use_remote_ack {
789            debug!(
790                forward_to = forward,
791                "subscription: remote ack not available, link negotiation may not have completed yet"
792            );
793        }
794
795        // As connection is deleted only after processing, at this point it must exist.
796        let Some(connection) = self.forwarder().get_connection(in_conn) else {
797            if let Some(id) = subscription_id {
798                debug!(%in_conn, "connection not found, sending error ack");
799                self.send_subscription_ack(
800                    in_connection,
801                    id,
802                    &Err(DataPathError::ConnectionNotFound(in_conn)),
803                )
804                .await;
805            }
806            return Err(DataPathError::MessageProcessingError {
807                source: Box::new(DataPathError::ConnectionNotFound(in_conn)),
808                msg: Box::new(msg),
809            });
810        };
811
812        // Do not process subscriptions forwarded back to local connections.
813        if recv_from.is_some() && connection.is_local_connection() {
814            if let Some(id) = subscription_id {
815                debug!(%in_conn, "subscription looped back to local connection, acking ok");
816                self.send_subscription_ack(in_connection, id, &Ok(())).await;
817            }
818            return Ok(());
819        }
820
821        debug!(use_remote_ack, dst = %msg.get_dst(), forward_to = forward, "subscription: ack path decision");
822
823        let sub_id = subscription_id.unwrap_or(0);
824
825        // Always register subscription as at this point we might not have received the link negotiaiion
826        // yet, so the other side might reply just after
827        let rx = self.internal.sub_ack_manager.register(sub_id);
828
829        // Update local state and forward the subscription.
830        let result = self
831            .process_subscription_update_and_forward(msg.clone(), in_conn, forward, add, sub_id)
832            .await;
833
834        // Remote-ack path: on success, spawn a retry loop that waits for the
835        // downstream ACK (the initial send was already done above) and retries
836        // on timeout.
837        if use_remote_ack && result.is_ok() {
838            let out_conn = forward.unwrap();
839
840            tokio::spawn(crate::subscription_ack::retry_loop(
841                self.clone(),
842                sub_id,
843                msg,
844                out_conn,
845                in_connection,
846                subscription_id,
847                rx,
848            ));
849
850            return Ok(());
851        }
852
853        // Default path (or remote-ack error fallback): ACK the requester immediately.
854        if let Some(id) = subscription_id {
855            debug!(%in_connection, ok = result.is_ok(), "sending immediate subscription ack");
856            self.send_subscription_ack(in_connection, id, &result).await;
857        }
858
859        result
860    }
861
862    pub async fn process_message(
863        &self,
864        msg: Message,
865        in_connection: u64,
866        is_local: bool,
867    ) -> Result<(), DataPathError> {
868        match msg.message_type {
869            Some(SubscribeType(_)) => self.process_subscription(msg, in_connection, true).await,
870            Some(UnsubscribeType(_)) => self.process_subscription(msg, in_connection, false).await,
871            Some(PublishType(_)) => self.process_publish(msg, in_connection).await,
872            Some(LinkType(link)) => {
873                self.handle_link_message(link, in_connection, is_local)
874                    .await
875            }
876            Some(SubscriptionAckType(ack)) => {
877                let result = if ack.success {
878                    Ok(())
879                } else {
880                    Err(DataPathError::RemoteSubscriptionAckError(ack.error.clone()))
881                };
882                self.internal
883                    .sub_ack_manager
884                    .resolve(ack.subscription_id, result);
885                Ok(())
886            }
887            None => unreachable!(
888                "message type not set; validate() must be called before process_message"
889            ),
890        }
891    }
892
893    async fn handle_new_message(
894        &self,
895        conn_index: u64,
896        is_local: bool,
897        mut msg: Message,
898    ) -> Result<(), DataPathError> {
899        debug!(%conn_index, "received message from connection");
900        info!(
901            telemetry = true,
902            monotonic_counter.num_processed_messages = 1
903        );
904
905        // validate message
906        if let Err(err) = msg.validate() {
907            info!(
908                telemetry = true,
909                monotonic_counter.num_messages_by_type = 1,
910                message_type = "none"
911            );
912
913            let ret_err = DataPathError::MessageProcessingError {
914                source: Box::new(err.into()),
915                msg: Box::new(msg),
916            };
917
918            return Err(ret_err);
919        }
920
921        // Link and SubscriptionAck messages have no SLIM header: skip header processing and telemetry span.
922        if !msg.is_link() && !msg.is_subscription_ack() {
923            // add incoming connection to the SLIM header
924            msg.set_incoming_conn(Some(conn_index));
925
926            #[cfg(feature = "otel_tracing")]
927            otel_tracing::prepare_inbound_msg(
928                &mut msg,
929                "process_local",
930                &self.internal.service_id,
931                conn_index,
932                is_local,
933            );
934        }
935
936        match self.process_message(msg, conn_index, is_local).await {
937            Ok(_) => Ok(()),
938            Err(e) => {
939                // telemetry /////////////////////////////////////////
940                info!(
941                    telemetry = true,
942                    monotonic_counter.num_message_process_errors = 1
943                );
944                //////////////////////////////////////////////////////
945
946                // drop message
947                Err(e)
948            }
949        }
950    }
951
952    #[tracing::instrument(skip_all, fields(service_id = %self.internal.service_id, conn_index))]
953    async fn send_error_to_local_app(&self, conn_index: u64, err: DataPathError) {
954        debug!(%conn_index, "sending error to local application");
955        let connection = self.forwarder().get_connection(conn_index);
956        match connection {
957            Some(conn) => {
958                debug!("try to notify the error to the local application");
959                if let Channel::Server(tx) = conn.channel() {
960                    // If the error contains the message, try to extract some session information
961                    let session_ctx = match &err {
962                        DataPathError::MessageProcessingError { msg, .. } => {
963                            MessageContext::from_msg(msg)
964                        }
965                        _ => None,
966                    };
967
968                    // Make error message with optional session context using shared type
969                    let payload = crate::errors::ErrorPayload::new(err.to_string(), session_ctx);
970                    let error_message = payload.to_json_string();
971
972                    // create Status error
973                    let status = Status::new(tonic::Code::Internal, error_message);
974
975                    if tx.send(Err(status)).await.is_err() {
976                        debug!(error = %err.chain(), "unable to notify the error to the local app");
977                    }
978                }
979            }
980            None => {
981                error!(
982                    "error sending error to local app: connection {:?} not found",
983                    conn_index
984                );
985            }
986        }
987    }
988
989    #[tracing::instrument(skip_all, fields(service_id = %self.internal.service_id, conn_index))]
990    async fn reconnect(
991        &self,
992        client_conf: ClientConfig,
993        conn_index: u64,
994        cancellation_token: &CancellationToken,
995    ) -> bool {
996        info!("connection lost with remote endpoint, attempting to reconnect");
997
998        // These are the subscriptions that we forwarded to the remote SLIM on
999        // this connection. It is necessary to restore them to keep receive the messages
1000        // The connections on the local subscription table (created using the set_route command)
1001        // are still there and will be removed only if the reconnection process fails.
1002        let remote_subscriptions = self
1003            .forwarder()
1004            .get_subscriptions_forwarded_on_connection(conn_index);
1005
1006        tokio::select! {
1007            _ = cancellation_token.cancelled() => {
1008                debug!("cancellation token signaled, stopping reconnection process");
1009                false
1010            }
1011            res = self.try_to_connect(client_conf, None, None, Some(conn_index)) => {
1012                match res {
1013                    Ok(_) => {
1014                        info!("connection re-established successfully");
1015                        // Restore subscriptions on the remote node.
1016                        // restore_tracking = false: the forwarded-subscription table was not
1017                        // cleaned up (same conn_index is reused), so we only replay the
1018                        // messages without re-registering local tracking state.
1019                        self.restore_remote_subscriptions(
1020                            &remote_subscriptions,
1021                            conn_index,
1022                            false,
1023                        )
1024                        .await;
1025                        true
1026                    }
1027                    Err(e) => {
1028                        error!(error = %e.chain(), "unable to reconnect to remote node");
1029                        false
1030                    }
1031                }
1032            }
1033        }
1034    }
1035
1036    /// Send an UNSUBSCRIBE message to the control plane for each subscription in `local_subs`.
1037    ///
1038    /// This is the single authoritative place that constructs and delivers CP unsubscribe
1039    /// notifications on connection loss, used by both the immediate cleanup path and the deferred
1040    /// TTL-expiry path.
1041    async fn notify_control_plane_subscriptions_lost(
1042        tx_cp: Option<Sender<Result<Message, Status>>>,
1043        local_subs: HashMap<Name, HashSet<u64>>,
1044        conn_index: u64,
1045    ) {
1046        let Some(tx) = tx_cp else { return };
1047        for local_sub in local_subs.into_keys() {
1048            debug!(
1049                %local_sub,
1050                "notify control plane about lost subscription",
1051            );
1052            let msg = Message::builder()
1053                .source(local_sub.clone())
1054                .destination(local_sub.clone())
1055                .flags(SlimHeaderFlags::default().with_recv_from(conn_index))
1056                .build_unsubscribe()
1057                .unwrap();
1058            if let Err(e) = tx.send(Ok(msg)).await {
1059                debug!(
1060                    %local_sub,
1061                    error = %e.chain(),
1062                    "failed to send unsubscribe to control plane",
1063                );
1064            }
1065        }
1066    }
1067
1068    fn process_stream(
1069        &self,
1070        mut stream: impl Stream<Item = Result<Message, Status>> + Unpin + Send + 'static,
1071        conn_index: u64,
1072        client_config: Option<ClientConfig>,
1073        cancellation_token: CancellationToken,
1074        is_local: bool,
1075        from_control_plane: bool,
1076    ) -> Result<JoinHandle<()>, DataPathError> {
1077        // Clone self to be able to move it into the spawned task
1078        let self_clone = self.clone();
1079        let token_clone = cancellation_token.clone();
1080        let client_conf_clone = client_config.clone();
1081        let tx_cp: Option<Sender<Result<Message, Status>>> = self.get_tx_control_plane();
1082        let watch = self.get_drain_watch()?;
1083        let span = tracing::info_span!(
1084            "process_stream",
1085            service_id = %self.internal.service_id,
1086            %conn_index,
1087            is_local,
1088        );
1089        let handle = tokio::spawn(async move {
1090            let mut try_to_reconnect = true;
1091
1092            let mut watch = std::pin::pin!(watch.signaled());
1093            loop {
1094                tokio::select! {
1095                    next = stream.next() => {
1096                        match next {
1097                            Some(result) => {
1098                                match result {
1099                                    Ok(msg) => {
1100                                        // check if we need to send the message to the control plane
1101                                        // we send the message if
1102                                        // 1. the message is coming from remote
1103                                        // 2. it is not coming from the control plane itself
1104                                        // 3. the control plane exists
1105                                        if !is_local && !from_control_plane && let Some(txcp) = &tx_cp {
1106                                            match msg.get_type() {
1107                                                PublishType(_) | LinkType(_) | SubscriptionAckType(_) => {/* do nothing */}
1108                                                _ => {
1109                                                    // send subscriptions and unsubscriptions
1110                                                    // to the control plane
1111                                                    let _ = txcp.send(Ok(msg.clone())).await;
1112                                                }
1113                                            }
1114                                        }
1115
1116                                        if let Err(e) = self_clone.handle_new_message(conn_index, is_local, msg).await {
1117                                            debug!(%conn_index, error = %e.chain(), "error processing incoming message");
1118                                            // If the message is coming from a local app, notify it
1119                                            if is_local {
1120                                                // try to forward error to the local app
1121                                                self_clone.send_error_to_local_app(conn_index, e).await;
1122                                            }
1123                                        }
1124                                    }
1125                                    Err(e) => {
1126                                        if let Some(io_err) = MessageProcessor::match_for_io_error(&e) {
1127                                            if io_err.kind() == std::io::ErrorKind::BrokenPipe {
1128                                                info!(%conn_index, "connection closed by peer");
1129                                            }
1130                                        } else {
1131                                            error!(error = %e.chain(), "error receiving messages");
1132                                        }
1133                                        break;
1134                                    }
1135                                }
1136                            }
1137                            None => {
1138                                debug!(%conn_index, "end of stream");
1139                                break;
1140                            }
1141                        }
1142                    }
1143                    _ = &mut watch => {
1144                        info!(%conn_index, "shutting down stream on drain");
1145                        try_to_reconnect = false;
1146                        break;
1147                    }
1148                    _ = token_clone.cancelled() => {
1149                        info!(%conn_index, "shutting down stream on cancellation token");
1150                        try_to_reconnect = false;
1151                        break;
1152                    }
1153                }
1154            }
1155
1156            // we drop rx now as otherwise the connection will be closed only
1157            // when the task is dropped and we want to make sure that the rx
1158            // stream is closed as soon as possible
1159            drop(stream);
1160
1161            // Save whether this is a client-initiated connection before client_conf_clone
1162            // is consumed by the if-let below.
1163            let is_client_connection = client_conf_clone.is_some();
1164            let mut connected = false;
1165
1166            if try_to_reconnect && let Some(config) = client_conf_clone {
1167                // Break the span chain: reconnect → try_to_connect → process_stream
1168                // would otherwise nest under the current process_stream span on every
1169                // reconnection, growing the span hierarchy unboundedly.
1170                connected = self_clone.reconnect(config, conn_index, &token_clone)
1171                    .instrument(tracing::Span::none())
1172                    .await;
1173            } else {
1174                debug!(%conn_index, "close connection")
1175            }
1176
1177            if !connected {
1178                // For incoming (server) connections capture the link_id before
1179                // on_connection_drop removes the connection from the table.
1180                let link_id = if !is_local && !is_client_connection {
1181                    self_clone
1182                        .forwarder()
1183                        .get_connection(conn_index)
1184                        .and_then(|c| c.link_id())
1185                } else {
1186                    None
1187                };
1188
1189                // Delete connection state from all tables.
1190                let (local_subs, remote_subs) = self_clone
1191                    .forwarder()
1192                    .on_connection_drop(conn_index, is_local);
1193
1194                let recovery_enabled =
1195                    !self_clone.internal.recovery_table.ttl().is_zero();
1196
1197                if let Some(lid) = link_id.filter(|_| recovery_enabled) {
1198                    // Server connection with a known link_id: preserve routing state and
1199                    // suppress the control-plane notification for the duration of the TTL
1200                    // to give the peer a chance to reconnect.
1201                    info!(
1202                        %conn_index, %lid,
1203                        "connection lost, storing recovery state (TTL: {:?})",
1204                        self_clone.internal.recovery_table.ttl(),
1205                    );
1206                    self_clone
1207                        .internal
1208                        .recovery_table
1209                        .store(lid.clone(), local_subs, remote_subs);
1210
1211                    // Spawn a TTL task that fires the CP notification if recovery never happens.
1212                    if let Ok(drain) = self_clone.get_drain_watch() {
1213                        let tx_cp_ttl = tx_cp;
1214                        let mp = self_clone.clone();
1215                        self_clone.internal.recovery_table.spawn_ttl_task(
1216                            lid,
1217                            drain,
1218                            move |entry| async move {
1219                                info!("recovery window expired, notifying control plane");
1220                                // Only unsubscribe names that are no longer reachable.
1221                                // If the peer reconnected with a different link_id, the
1222                                // CP will have already pushed the same subscriptions on
1223                                // the new connection — those names are still in the
1224                                // subscription table and must not be torn down.
1225                                let unreachable = entry
1226                                    .local_subs
1227                                    .into_iter()
1228                                    .filter(|(name, _)| {
1229                                        mp.forwarder()
1230                                            .on_publish_msg_match(name.clone(), u64::MAX, u32::MAX)
1231                                            .is_err()
1232                                    })
1233                                    .collect();
1234                                MessageProcessor::notify_control_plane_subscriptions_lost(
1235                                    tx_cp_ttl,
1236                                    unreachable,
1237                                    conn_index,
1238                                )
1239                                .await;
1240                            },
1241                        );
1242                    }
1243                } else {
1244                    // No link_id (local connection, client that failed to reconnect, or a peer
1245                    // that does not support link negotiation): notify the control plane now.
1246                    if !is_local {
1247                        MessageProcessor::notify_control_plane_subscriptions_lost(
1248                            tx_cp, local_subs, conn_index,
1249                        )
1250                        .await;
1251                    }
1252                }
1253
1254                info!(telemetry = true, counter.num_active_connections = -1);
1255            }
1256        }.instrument(span));
1257
1258        Ok(handle)
1259    }
1260
1261    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1262        let mut err: &(dyn std::error::Error + 'static) = err_status;
1263
1264        loop {
1265            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1266                return Some(io_err);
1267            }
1268
1269            // h2::Error do not expose std::io::Error with `source()`
1270            // https://github.com/hyperium/h2/pull/462
1271            if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1272                && let Some(io_err) = h2_err.get_io()
1273            {
1274                return Some(io_err);
1275            }
1276
1277            err = err.source()?;
1278        }
1279    }
1280
1281    pub fn subscription_table(&self) -> &SubscriptionTableImpl {
1282        &self.internal.forwarder.subscription_table
1283    }
1284
1285    pub fn connection_table(&self) -> &ConnectionTable<Connection> {
1286        &self.internal.forwarder.connection_table
1287    }
1288}
1289
1290#[tonic::async_trait]
1291impl DataPlaneService for MessageProcessor {
1292    type OpenChannelStream = Pin<Box<dyn Stream<Item = Result<Message, Status>> + Send + 'static>>;
1293
1294    async fn open_channel(
1295        &self,
1296        request: Request<tonic::Streaming<Message>>,
1297    ) -> Result<Response<Self::OpenChannelStream>, Status> {
1298        let remote_addr = request.remote_addr();
1299        let local_addr = request.local_addr();
1300
1301        let stream = request.into_inner();
1302        let (tx, rx) = mpsc::channel(128);
1303
1304        let connection = Connection::new(ConnectionType::Remote, Channel::Server(tx))
1305            .with_remote_addr(remote_addr)
1306            .with_local_addr(local_addr);
1307
1308        debug!(
1309            remote = ?connection.remote_addr(),
1310            local = ?connection.local_addr(),
1311            "new connection received from remote",
1312        );
1313        info!(telemetry = true, counter.num_active_connections = 1);
1314
1315        // insert connection into connection table
1316        let conn_index = self
1317            .forwarder()
1318            .on_connection_established(connection, None)
1319            .unwrap();
1320
1321        self.process_stream(
1322            stream,
1323            conn_index,
1324            None,
1325            CancellationToken::new(),
1326            false,
1327            false,
1328        )
1329        .map_err(|e| {
1330            error!(error = %e.chain(), "error starting new processing stream");
1331            Status::unavailable(format!("error processing stream: {:?}", e))
1332        })?;
1333
1334        let out_stream = ReceiverStream::new(rx);
1335        Ok(Response::new(
1336            Box::pin(out_stream) as Self::OpenChannelStream
1337        ))
1338    }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343    use slim_config::grpc::client::is_valid_uuid_v4;
1344    use std::time::Duration;
1345
1346    use super::*;
1347    use crate::api::ProtoSubscriptionAck;
1348    use crate::tables::remote_subscription_table::SubscriptionInfo;
1349    use tonic::Status;
1350
1351    async fn assert_failed_subscription_ack_is_sent(add: bool) {
1352        let processor = MessageProcessor::new();
1353        let (in_connection, _tx, mut rx) = processor
1354            .register_local_connection(false)
1355            .expect("failed to create local connection");
1356
1357        let source = Name::from_strings(["org", "ns", "source"]).with_id(1);
1358        let destination = Name::from_strings(["org", "ns", "destination"]).with_id(2);
1359        let ack_id: u64 = if add { 1 } else { 2 };
1360        let invalid_connection = u64::MAX - 1;
1361
1362        let builder = Message::builder()
1363            .source(source.clone())
1364            .destination(destination.clone())
1365            .incoming_conn(invalid_connection)
1366            .subscription_id(ack_id);
1367
1368        let msg = if add {
1369            builder.build_subscribe().unwrap()
1370        } else {
1371            builder.build_unsubscribe().unwrap()
1372        };
1373
1374        let result = processor
1375            .process_subscription(msg, in_connection, add)
1376            .await;
1377        assert!(matches!(
1378            result,
1379            Err(DataPathError::MessageProcessingError { .. })
1380        ));
1381
1382        let ack_msg = tokio::time::timeout(Duration::from_secs(1), rx.recv())
1383            .await
1384            .expect("timeout waiting for ack")
1385            .expect("ack channel closed")
1386            .expect("failed to receive ack message");
1387
1388        assert!(matches!(ack_msg.get_type(), SubscriptionAckType(_)));
1389        let ack = ack_msg.get_subscription_ack();
1390        assert_eq!(ack.subscription_id, ack_id);
1391        assert!(!ack.success, "failed ack should have success=false");
1392        assert!(
1393            !ack.error.is_empty(),
1394            "failed ack should include an error message"
1395        );
1396    }
1397
1398    #[tokio::test]
1399    async fn test_process_subscription_sends_failed_ack_on_subscribe_error() {
1400        assert_failed_subscription_ack_is_sent(true).await;
1401    }
1402
1403    #[tokio::test]
1404    async fn test_process_subscription_sends_failed_ack_on_unsubscribe_error() {
1405        assert_failed_subscription_ack_is_sent(false).await;
1406    }
1407
1408    #[test]
1409    fn test_is_valid_uuid_v4_accepts_v4() {
1410        let id = uuid::Uuid::new_v4().to_string();
1411        assert!(is_valid_uuid_v4(&id));
1412    }
1413
1414    #[test]
1415    fn test_is_valid_uuid_v4_rejects_non_uuid_string() {
1416        assert!(!is_valid_uuid_v4("not-a-uuid"));
1417        assert!(!is_valid_uuid_v4(""));
1418    }
1419
1420    #[test]
1421    fn test_is_valid_uuid_v4_rejects_non_v4_uuid() {
1422        // Version 1 UUID (time-based).
1423        assert!(!is_valid_uuid_v4("00000000-0000-1000-8000-000000000000"));
1424    }
1425
1426    // ── handle_link_message ───────────────────────────────────────────────────
1427
1428    #[tokio::test]
1429    async fn test_handle_link_message_is_local_ignored() {
1430        let processor = MessageProcessor::new();
1431        let link = ProtoLink { link_type: None };
1432        assert!(processor.handle_link_message(link, 0, true).await.is_ok());
1433    }
1434
1435    #[tokio::test]
1436    async fn test_handle_link_message_none_link_type_ignored() {
1437        let processor = MessageProcessor::new();
1438        let link = ProtoLink { link_type: None };
1439        assert!(processor.handle_link_message(link, 0, false).await.is_ok());
1440    }
1441
1442    // ── handle_link_negotiation ───────────────────────────────────────────────
1443
1444    fn make_server_conn(
1445        processor: &MessageProcessor,
1446    ) -> (u64, tokio::sync::mpsc::Receiver<Result<Message, Status>>) {
1447        let (tx, rx) = mpsc::channel(16);
1448        let conn = Connection::new(ConnectionType::Remote, Channel::Server(tx));
1449        let conn_id = processor
1450            .forwarder()
1451            .on_connection_established(conn, None)
1452            .unwrap();
1453        (conn_id, rx)
1454    }
1455
1456    fn make_client_conn(
1457        processor: &MessageProcessor,
1458    ) -> (u64, tokio::sync::mpsc::Receiver<Message>) {
1459        let (tx, rx) = mpsc::channel(16);
1460        let conn = Connection::new(ConnectionType::Remote, Channel::Client(tx));
1461        let conn_id = processor
1462            .forwarder()
1463            .on_connection_established(conn, None)
1464            .unwrap();
1465        (conn_id, rx)
1466    }
1467
1468    #[tokio::test]
1469    async fn test_handle_link_negotiation_unknown_connection_ignored() {
1470        let processor = MessageProcessor::new();
1471        let payload = LinkNegotiationPayload {
1472            link_id: uuid::Uuid::new_v4().to_string(),
1473            slim_version: "1.0.0".into(),
1474            is_reply: false,
1475        };
1476        assert!(
1477            processor
1478                .handle_link_negotiation(&payload, u64::MAX)
1479                .await
1480                .is_ok()
1481        );
1482    }
1483
1484    #[tokio::test]
1485    async fn test_handle_link_negotiation_role_outgoing_receives_request_ignored() {
1486        let processor = MessageProcessor::new();
1487        let (conn_id, _rx) = make_client_conn(&processor);
1488        let payload = LinkNegotiationPayload {
1489            link_id: uuid::Uuid::new_v4().to_string(),
1490            slim_version: "1.0.0".into(),
1491            is_reply: false, // request on outgoing connection → ignored
1492        };
1493        assert!(
1494            processor
1495                .handle_link_negotiation(&payload, conn_id)
1496                .await
1497                .is_ok()
1498        );
1499        assert!(
1500            processor
1501                .forwarder()
1502                .get_connection(conn_id)
1503                .unwrap()
1504                .remote_slim_version()
1505                .is_none()
1506        );
1507    }
1508
1509    #[tokio::test]
1510    async fn test_handle_link_negotiation_role_incoming_receives_reply_ignored() {
1511        let processor = MessageProcessor::new();
1512        let (conn_id, _rx) = make_server_conn(&processor);
1513        let payload = LinkNegotiationPayload {
1514            link_id: uuid::Uuid::new_v4().to_string(),
1515            slim_version: "1.0.0".into(),
1516            is_reply: true, // reply on incoming connection → ignored
1517        };
1518        assert!(
1519            processor
1520                .handle_link_negotiation(&payload, conn_id)
1521                .await
1522                .is_ok()
1523        );
1524        assert!(
1525            processor
1526                .forwarder()
1527                .get_connection(conn_id)
1528                .unwrap()
1529                .remote_slim_version()
1530                .is_none()
1531        );
1532    }
1533
1534    #[tokio::test]
1535    async fn test_handle_link_negotiation_unparsable_version_ignored() {
1536        let processor = MessageProcessor::new();
1537        let (conn_id, _rx) = make_server_conn(&processor);
1538        let payload = LinkNegotiationPayload {
1539            link_id: uuid::Uuid::new_v4().to_string(),
1540            slim_version: "not-semver".into(),
1541            is_reply: false,
1542        };
1543        assert!(
1544            processor
1545                .handle_link_negotiation(&payload, conn_id)
1546                .await
1547                .is_ok()
1548        );
1549        assert!(
1550            processor
1551                .forwarder()
1552                .get_connection(conn_id)
1553                .unwrap()
1554                .remote_slim_version()
1555                .is_none()
1556        );
1557    }
1558
1559    #[tokio::test]
1560    async fn test_handle_link_negotiation_server_invalid_uuid_ignored() {
1561        let processor = MessageProcessor::new();
1562        let (conn_id, _rx) = make_server_conn(&processor);
1563        let payload = LinkNegotiationPayload {
1564            link_id: "not-a-uuid".into(),
1565            slim_version: "1.0.0".into(),
1566            is_reply: false,
1567        };
1568        assert!(
1569            processor
1570                .handle_link_negotiation(&payload, conn_id)
1571                .await
1572                .is_ok()
1573        );
1574        assert!(
1575            processor
1576                .forwarder()
1577                .get_connection(conn_id)
1578                .unwrap()
1579                .remote_slim_version()
1580                .is_none()
1581        );
1582    }
1583
1584    #[tokio::test]
1585    async fn test_handle_link_negotiation_server_happy_path() {
1586        let processor = MessageProcessor::new();
1587        let (conn_id, mut rx) = make_server_conn(&processor);
1588        let link_id = uuid::Uuid::new_v4().to_string();
1589        let payload = LinkNegotiationPayload {
1590            link_id: link_id.clone(),
1591            slim_version: "1.2.3".into(),
1592            is_reply: false,
1593        };
1594        assert!(
1595            processor
1596                .handle_link_negotiation(&payload, conn_id)
1597                .await
1598                .is_ok()
1599        );
1600        let conn = processor.forwarder().get_connection(conn_id).unwrap();
1601        assert_eq!(conn.link_id(), Some(link_id));
1602        assert_eq!(
1603            conn.remote_slim_version(),
1604            Some(semver::Version::parse("1.2.3").unwrap())
1605        );
1606        // A reply must have been sent.
1607        let reply = rx.try_recv().expect("reply should be sent").unwrap();
1608        assert!(reply.is_link());
1609    }
1610
1611    #[tokio::test]
1612    async fn test_handle_link_negotiation_server_replay_protection() {
1613        let processor = MessageProcessor::new();
1614        let (conn_id, mut rx) = make_server_conn(&processor);
1615        let link_id = uuid::Uuid::new_v4().to_string();
1616        let payload = LinkNegotiationPayload {
1617            link_id: link_id.clone(),
1618            slim_version: "1.0.0".into(),
1619            is_reply: false,
1620        };
1621        // First request: accepted, reply sent.
1622        assert!(
1623            processor
1624                .handle_link_negotiation(&payload, conn_id)
1625                .await
1626                .is_ok()
1627        );
1628        assert!(rx.try_recv().is_ok());
1629        // Second request: replay protection must suppress it, no reply.
1630        assert!(
1631            processor
1632                .handle_link_negotiation(&payload, conn_id)
1633                .await
1634                .is_ok()
1635        );
1636        assert!(rx.try_recv().is_err());
1637    }
1638
1639    #[tokio::test]
1640    async fn test_handle_link_negotiation_client_happy_path() {
1641        let processor = MessageProcessor::new();
1642        let (conn_id, _rx) = make_client_conn(&processor);
1643        let link_id = uuid::Uuid::new_v4().to_string();
1644        let conn = processor.forwarder().get_connection(conn_id).unwrap();
1645        conn.set_link_id(link_id.clone());
1646        let payload = LinkNegotiationPayload {
1647            link_id: link_id.clone(),
1648            slim_version: "2.0.0".into(),
1649            is_reply: true,
1650        };
1651        assert!(
1652            processor
1653                .handle_link_negotiation(&payload, conn_id)
1654                .await
1655                .is_ok()
1656        );
1657        assert_eq!(
1658            conn.remote_slim_version(),
1659            Some(semver::Version::parse("2.0.0").unwrap())
1660        );
1661    }
1662
1663    #[tokio::test]
1664    async fn test_handle_link_negotiation_client_link_id_mismatch_ignored() {
1665        let processor = MessageProcessor::new();
1666        let (conn_id, _rx) = make_client_conn(&processor);
1667        let conn = processor.forwarder().get_connection(conn_id).unwrap();
1668        conn.set_link_id("correct-id".to_string());
1669        let payload = LinkNegotiationPayload {
1670            link_id: "wrong-id".into(),
1671            slim_version: "1.0.0".into(),
1672            is_reply: true,
1673        };
1674        assert!(
1675            processor
1676                .handle_link_negotiation(&payload, conn_id)
1677                .await
1678                .is_ok()
1679        );
1680        assert!(conn.remote_slim_version().is_none());
1681    }
1682
1683    #[tokio::test]
1684    async fn test_handle_link_negotiation_client_replay_protection() {
1685        let processor = MessageProcessor::new();
1686        let (conn_id, _rx) = make_client_conn(&processor);
1687        let link_id = uuid::Uuid::new_v4().to_string();
1688        let conn = processor.forwarder().get_connection(conn_id).unwrap();
1689        conn.set_link_id(link_id.clone());
1690        let payload = LinkNegotiationPayload {
1691            link_id: link_id.clone(),
1692            slim_version: "1.0.0".into(),
1693            is_reply: true,
1694        };
1695        // First reply: accepted.
1696        assert!(
1697            processor
1698                .handle_link_negotiation(&payload, conn_id)
1699                .await
1700                .is_ok()
1701        );
1702        let stored = conn.remote_slim_version();
1703        assert!(stored.is_some());
1704        // Second reply: replay protection must reject it; version unchanged.
1705        assert!(
1706            processor
1707                .handle_link_negotiation(&payload, conn_id)
1708                .await
1709                .is_ok()
1710        );
1711        assert_eq!(conn.remote_slim_version(), stored);
1712    }
1713
1714    // ── process_subscription: remote ack path ─────────────────────────────────
1715
1716    /// Helper: negotiate a server connection to version `v` so
1717    /// `subscription_ack::supports` returns the expected value.
1718    fn negotiate_conn(processor: &MessageProcessor, conn_id: u64, version: &str) {
1719        let c = processor.forwarder().get_connection(conn_id).unwrap();
1720        c.complete_negotiation_as_server(
1721            &uuid::Uuid::new_v4().to_string(),
1722            semver::Version::parse(version).unwrap(),
1723        );
1724    }
1725
1726    #[tokio::test]
1727    async fn test_process_subscription_remote_ack_path_success() {
1728        // Arrange: relay processor, local app connection, and a "remote" server
1729        // connection whose version is ≥ 1.2.0.
1730        let processor = MessageProcessor::new();
1731        let (local_conn, _tx_local, mut rx_local) = processor
1732            .register_local_connection(false)
1733            .expect("failed to create local connection");
1734
1735        let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1736        negotiate_conn(&processor, remote_conn, "1.2.0");
1737
1738        let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1739        let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1740        let upstream_ack_id: u64 = 100;
1741
1742        // Build subscribe: forward_to = remote_conn, with upstream ack ID.
1743        let sub_msg = Message::builder()
1744            .source(source.clone())
1745            .destination(destination.clone())
1746            .incoming_conn(local_conn)
1747            .forward_to(remote_conn)
1748            .subscription_id(upstream_ack_id)
1749            .build_subscribe()
1750            .unwrap();
1751
1752        // Act: process_subscription should spawn the retry task and return Ok(()).
1753        let result = processor
1754            .process_subscription(sub_msg, local_conn, true)
1755            .await;
1756        assert!(result.is_ok());
1757
1758        // The relay must have forwarded the subscribe to the remote connection.
1759        // Give the spawned task a moment to send the message.
1760        let forwarded = tokio::time::timeout(Duration::from_secs(1), rx_remote.recv())
1761            .await
1762            .expect("timeout waiting for forwarded subscribe")
1763            .expect("forwarded subscribe channel closed")
1764            .unwrap();
1765        assert!(matches!(forwarded.get_type(), SubscribeType(_)));
1766
1767        // The forwarded message must carry the same subscription_id as the original.
1768        let forwarded_sub_id = forwarded
1769            .get_subscription_id()
1770            .expect("forwarded subscribe must carry the same subscription_id");
1771        assert_eq!(
1772            forwarded_sub_id, upstream_ack_id,
1773            "subscription_id must not change when forwarding"
1774        );
1775
1776        // Simulate the remote node sending back a success SubscriptionAck.
1777        let ack = ProtoSubscriptionAck {
1778            subscription_id: upstream_ack_id,
1779            success: true,
1780            error: String::new(),
1781        };
1782        processor.internal.sub_ack_manager.resolve(
1783            ack.subscription_id,
1784            if ack.success {
1785                Ok(())
1786            } else {
1787                Err(DataPathError::RemoteSubscriptionAckError(ack.error.clone()))
1788            },
1789        );
1790
1791        // The relay must now forward the upstream ACK to the local connection.
1792        let upstream_ack = tokio::time::timeout(Duration::from_secs(2), rx_local.recv())
1793            .await
1794            .expect("timeout waiting for upstream ack")
1795            .expect("upstream ack channel closed")
1796            .expect("upstream ack should be Ok");
1797
1798        assert!(matches!(upstream_ack.get_type(), SubscriptionAckType(_)));
1799        let ack_inner = upstream_ack.get_subscription_ack();
1800        assert_eq!(ack_inner.subscription_id, upstream_ack_id);
1801        assert!(ack_inner.success);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_process_subscription_remote_ack_path_old_node_immediate_ack() {
1806        // Old remote node (v < 1.2.0): should use the existing immediate-ack path.
1807        let processor = MessageProcessor::new();
1808        let (local_conn, _tx_local, mut rx_local) = processor
1809            .register_local_connection(false)
1810            .expect("failed to create local connection");
1811
1812        let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1813        negotiate_conn(&processor, remote_conn, "1.1.0");
1814
1815        let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1816        let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1817        let upstream_ack_id: u64 = 101;
1818
1819        let sub_msg = Message::builder()
1820            .source(source.clone())
1821            .destination(destination.clone())
1822            .incoming_conn(local_conn)
1823            .forward_to(remote_conn)
1824            .subscription_id(upstream_ack_id)
1825            .build_subscribe()
1826            .unwrap();
1827
1828        processor
1829            .process_subscription(sub_msg, local_conn, true)
1830            .await
1831            .unwrap();
1832
1833        // Forwarded subscribe must have been sent to remote.
1834        // The subscription_id is a globally unique identifier that always travels
1835        // with the subscription, regardless of whether the remote supports acks.
1836        let forwarded = tokio::time::timeout(Duration::from_secs(1), rx_remote.recv())
1837            .await
1838            .expect("timeout waiting for forwarded subscribe")
1839            .expect("channel closed")
1840            .unwrap();
1841        assert!(matches!(forwarded.get_type(), SubscribeType(_)));
1842        let forwarded_sub_id = forwarded
1843            .get_subscription_id()
1844            .expect("forwarded subscribe must carry the subscription_id");
1845        assert_eq!(
1846            forwarded_sub_id, upstream_ack_id,
1847            "subscription_id must not change when forwarding"
1848        );
1849
1850        // Upstream ACK must be sent immediately (without waiting for remote).
1851        let upstream_ack = tokio::time::timeout(Duration::from_secs(1), rx_local.recv())
1852            .await
1853            .expect("timeout waiting for upstream ack")
1854            .expect("channel closed")
1855            .expect("upstream ack must be Ok");
1856
1857        assert!(matches!(upstream_ack.get_type(), SubscriptionAckType(_)));
1858        let ack = upstream_ack.get_subscription_ack();
1859        assert_eq!(ack.subscription_id, upstream_ack_id);
1860        assert!(ack.success);
1861    }
1862
1863    #[tokio::test]
1864    async fn test_process_subscription_remote_ack_error_forwarded_upstream() {
1865        // Remote node (v1.2.0) sends back a failure ACK; relay must forward it upstream.
1866        let processor = MessageProcessor::new();
1867        let (local_conn, _tx_local, mut rx_local) = processor
1868            .register_local_connection(false)
1869            .expect("failed to create local connection");
1870
1871        let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1872        negotiate_conn(&processor, remote_conn, "1.2.0");
1873
1874        let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1875        let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1876        let upstream_ack_id: u64 = 102;
1877
1878        let sub_msg = Message::builder()
1879            .source(source.clone())
1880            .destination(destination.clone())
1881            .incoming_conn(local_conn)
1882            .forward_to(remote_conn)
1883            .subscription_id(upstream_ack_id)
1884            .build_subscribe()
1885            .unwrap();
1886
1887        processor
1888            .process_subscription(sub_msg, local_conn, true)
1889            .await
1890            .unwrap();
1891
1892        let forwarded = tokio::time::timeout(Duration::from_secs(1), rx_remote.recv())
1893            .await
1894            .expect("timeout")
1895            .expect("channel closed")
1896            .unwrap();
1897
1898        let forwarded_sub_id = forwarded
1899            .get_subscription_id()
1900            .expect("forwarded subscribe must carry the same subscription_id");
1901        assert_eq!(
1902            forwarded_sub_id, upstream_ack_id,
1903            "subscription_id must not change when forwarding"
1904        );
1905
1906        // Simulate remote failure via SubscriptionAck.
1907        let ack = ProtoSubscriptionAck {
1908            subscription_id: upstream_ack_id,
1909            success: false,
1910            error: "remote error".to_string(),
1911        };
1912        processor.internal.sub_ack_manager.resolve(
1913            ack.subscription_id,
1914            if ack.success {
1915                Ok(())
1916            } else {
1917                Err(DataPathError::RemoteSubscriptionAckError(ack.error.clone()))
1918            },
1919        );
1920
1921        let upstream_ack = tokio::time::timeout(Duration::from_secs(2), rx_local.recv())
1922            .await
1923            .expect("timeout")
1924            .expect("channel closed")
1925            .expect("must be Ok");
1926
1927        assert!(matches!(upstream_ack.get_type(), SubscriptionAckType(_)));
1928        let ack_inner = upstream_ack.get_subscription_ack();
1929        assert_eq!(ack_inner.subscription_id, upstream_ack_id);
1930        assert!(!ack_inner.success);
1931        assert!(!ack_inner.error.is_empty());
1932    }
1933
1934    // ── retry_loop tests ──────────────────────────────────────────────────────
1935
1936    fn make_test_subscribe(sub_id: u64) -> Message {
1937        let source = Name::from_strings(["org", "ns", "src"]).with_id(1);
1938        let destination = Name::from_strings(["org", "ns", "dst"]).with_id(2);
1939        Message::builder()
1940            .source(source)
1941            .destination(destination)
1942            .subscription_id(sub_id)
1943            .build_subscribe()
1944            .unwrap()
1945    }
1946
1947    #[tokio::test(start_paused = true)]
1948    async fn test_retry_loop_ack_received_before_timeout() {
1949        // ACK arrives within the first wait window → no retry send.
1950        let processor = MessageProcessor::new();
1951        let (local_conn, _tx_local, mut rx_local) = processor
1952            .register_local_connection(false)
1953            .expect("failed to create local connection");
1954        let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1955
1956        let sub_id: u64 = 1000;
1957        let msg = make_test_subscribe(sub_id);
1958        let rx = processor.internal.sub_ack_manager.register(sub_id);
1959
1960        let proc_clone = processor.clone();
1961        let handle = tokio::spawn(crate::subscription_ack::retry_loop(
1962            proc_clone,
1963            sub_id,
1964            msg,
1965            remote_conn,
1966            local_conn,
1967            Some(sub_id),
1968            rx,
1969        ));
1970
1971        // Resolve immediately — the loop should receive it before the timeout.
1972        processor.internal.sub_ack_manager.resolve(sub_id, Ok(()));
1973
1974        handle.await.unwrap();
1975
1976        // No retry sends should have been made.
1977        assert!(
1978            rx_remote.try_recv().is_err(),
1979            "no retry send expected when ack arrives before timeout"
1980        );
1981
1982        // Upstream ack must have been sent.
1983        let ack = rx_local
1984            .try_recv()
1985            .expect("upstream ack should have been sent")
1986            .unwrap();
1987        assert!(ack.get_subscription_ack().success);
1988    }
1989
1990    #[tokio::test(start_paused = true)]
1991    async fn test_retry_loop_timeout_then_retry_send_then_ack() {
1992        // First wait times out → retry send → ACK arrives on second wait.
1993        let processor = MessageProcessor::new();
1994        let (local_conn, _tx_local, mut rx_local) = processor
1995            .register_local_connection(false)
1996            .expect("failed to create local connection");
1997        let (remote_conn, mut rx_remote) = make_server_conn(&processor);
1998
1999        let sub_id: u64 = 1001;
2000        let msg = make_test_subscribe(sub_id);
2001        let rx = processor.internal.sub_ack_manager.register(sub_id);
2002
2003        let proc_clone = processor.clone();
2004        let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2005            proc_clone,
2006            sub_id,
2007            msg,
2008            remote_conn,
2009            local_conn,
2010            Some(sub_id),
2011            rx,
2012        ));
2013
2014        // Let the first timeout elapse → triggers a retry send.
2015        tokio::time::sleep(crate::subscription_ack::TIMEOUT + Duration::from_millis(100)).await;
2016
2017        // A retry message should have been sent.
2018        let retried = rx_remote
2019            .try_recv()
2020            .expect("retry send expected after first timeout")
2021            .unwrap();
2022        assert!(retried.get_subscription_id().is_some());
2023
2024        // Now resolve so the second wait succeeds.
2025        processor.internal.sub_ack_manager.resolve(sub_id, Ok(()));
2026
2027        handle.await.unwrap();
2028
2029        // Upstream success ack.
2030        let ack = rx_local
2031            .try_recv()
2032            .expect("upstream ack should have been sent")
2033            .unwrap();
2034        assert!(ack.get_subscription_ack().success);
2035    }
2036
2037    #[tokio::test(start_paused = true)]
2038    async fn test_retry_loop_retry_send_fails() {
2039        // Timeout → retry send fails because the connection is gone → loop
2040        // exits with the send error, upstream receives a failure ack.
2041        let processor = MessageProcessor::new();
2042        let (local_conn, _tx_local, mut rx_local) = processor
2043            .register_local_connection(false)
2044            .expect("failed to create local connection");
2045        let (remote_conn, _rx_remote) = make_server_conn(&processor);
2046
2047        let sub_id: u64 = 1002;
2048        let msg = make_test_subscribe(sub_id);
2049        let rx = processor.internal.sub_ack_manager.register(sub_id);
2050
2051        // Drop the remote connection so send_msg fails on retry.
2052        processor.connection_table().remove(remote_conn);
2053
2054        let proc_clone = processor.clone();
2055        let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2056            proc_clone,
2057            sub_id,
2058            msg,
2059            remote_conn,
2060            local_conn,
2061            Some(sub_id),
2062            rx,
2063        ));
2064
2065        // Let the first timeout elapse → triggers a retry send which fails.
2066        tokio::time::sleep(crate::subscription_ack::TIMEOUT + Duration::from_millis(100)).await;
2067
2068        handle.await.unwrap();
2069
2070        // Upstream failure ack.
2071        let ack = rx_local
2072            .try_recv()
2073            .expect("upstream ack should have been sent")
2074            .unwrap();
2075        assert!(!ack.get_subscription_ack().success);
2076    }
2077
2078    #[tokio::test(start_paused = true)]
2079    async fn test_retry_loop_all_retries_exhausted() {
2080        // No ACK ever arrives → all waits time out → final_result is timeout error.
2081        let processor = MessageProcessor::new();
2082        let (local_conn, _tx_local, mut rx_local) = processor
2083            .register_local_connection(false)
2084            .expect("failed to create local connection");
2085        let (remote_conn, mut rx_remote) = make_server_conn(&processor);
2086
2087        let sub_id: u64 = 1003;
2088        let msg = make_test_subscribe(sub_id);
2089        let rx = processor.internal.sub_ack_manager.register(sub_id);
2090
2091        let proc_clone = processor.clone();
2092        let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2093            proc_clone,
2094            sub_id,
2095            msg,
2096            remote_conn,
2097            local_conn,
2098            Some(sub_id),
2099            rx,
2100        ));
2101
2102        // Advance time past all retry windows: (MAX_RETRIES + 1) timeouts.
2103        for _ in 0..=crate::subscription_ack::MAX_RETRIES {
2104            tokio::time::sleep(crate::subscription_ack::TIMEOUT + Duration::from_millis(100)).await;
2105        }
2106
2107        handle.await.unwrap();
2108
2109        // Should have exactly MAX_RETRIES retry sends (attempts 0..MAX_RETRIES-1
2110        // trigger resends; the last attempt only waits).
2111        let mut retry_count = 0;
2112        while rx_remote.try_recv().is_ok() {
2113            retry_count += 1;
2114        }
2115        assert_eq!(
2116            retry_count,
2117            crate::subscription_ack::MAX_RETRIES as usize,
2118            "expected {} retry sends",
2119            crate::subscription_ack::MAX_RETRIES,
2120        );
2121
2122        // Upstream ack must indicate failure (timeout).
2123        let ack = rx_local
2124            .try_recv()
2125            .expect("upstream ack should have been sent")
2126            .unwrap();
2127        let ack_inner = ack.get_subscription_ack();
2128        assert!(
2129            !ack_inner.success,
2130            "ack must indicate failure after exhausting retries"
2131        );
2132        assert!(!ack_inner.error.is_empty());
2133    }
2134
2135    #[tokio::test(start_paused = true)]
2136    async fn test_retry_loop_no_upstream_subscription_id() {
2137        // When upstream_subscription_id is None, no upstream ack is sent.
2138        let processor = MessageProcessor::new();
2139        let (_local_conn, _tx_local, mut rx_local) = processor
2140            .register_local_connection(false)
2141            .expect("failed to create local connection");
2142        let (remote_conn, _rx_remote) = make_server_conn(&processor);
2143
2144        let sub_id: u64 = 1004;
2145        let msg = make_test_subscribe(sub_id);
2146        let rx = processor.internal.sub_ack_manager.register(sub_id);
2147
2148        let proc_clone = processor.clone();
2149        let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2150            proc_clone,
2151            sub_id,
2152            msg,
2153            remote_conn,
2154            0, // in_connection — irrelevant since upstream_subscription_id is None
2155            None,
2156            rx,
2157        ));
2158
2159        // Resolve immediately.
2160        processor.internal.sub_ack_manager.resolve(sub_id, Ok(()));
2161
2162        handle.await.unwrap();
2163
2164        // No upstream ack should be sent.
2165        assert!(
2166            rx_local.try_recv().is_err(),
2167            "no upstream ack when upstream_subscription_id is None"
2168        );
2169    }
2170
2171    #[tokio::test(start_paused = true)]
2172    async fn test_retry_loop_sender_dropped() {
2173        // If the oneshot sender is dropped (e.g. processor shutdown), the loop
2174        // must exit promptly without panicking.
2175        let processor = MessageProcessor::new();
2176        let (local_conn, _tx_local, mut rx_local) = processor
2177            .register_local_connection(false)
2178            .expect("failed to create local connection");
2179        let (remote_conn, _rx_remote) = make_server_conn(&processor);
2180
2181        let sub_id: u64 = 1005;
2182        let msg = make_test_subscribe(sub_id);
2183        let rx = processor.internal.sub_ack_manager.register(sub_id);
2184
2185        // Drop the sender by removing the pending entry.
2186        processor.internal.sub_ack_manager.remove(sub_id);
2187
2188        let proc_clone = processor.clone();
2189        let handle = tokio::spawn(crate::subscription_ack::retry_loop(
2190            proc_clone,
2191            sub_id,
2192            msg,
2193            remote_conn,
2194            local_conn,
2195            Some(sub_id),
2196            rx,
2197        ));
2198
2199        handle.await.unwrap();
2200
2201        // Upstream failure ack (timeout error since we never got a result).
2202        let ack = rx_local
2203            .try_recv()
2204            .expect("upstream ack should have been sent")
2205            .unwrap();
2206        assert!(!ack.get_subscription_ack().success);
2207    }
2208
2209    // ── new_with_options ──────────────────────────────────────────────────────
2210
2211    #[test]
2212    fn test_new_with_options_custom_ttl() {
2213        let processor =
2214            MessageProcessor::new_with_options("svc".into(), Some(Duration::from_secs(5)));
2215        assert_eq!(
2216            processor.internal.recovery_table.ttl(),
2217            Duration::from_secs(5)
2218        );
2219    }
2220
2221    #[test]
2222    fn test_new_with_options_none_uses_default() {
2223        let processor = MessageProcessor::new_with_options("svc".into(), None);
2224        assert_eq!(
2225            processor.internal.recovery_table.ttl(),
2226            Duration::from_secs(30)
2227        );
2228    }
2229
2230    #[test]
2231    fn test_new_with_options_zero_ttl() {
2232        let processor = MessageProcessor::new_with_options("svc".into(), Some(Duration::ZERO));
2233        assert!(processor.internal.recovery_table.ttl().is_zero());
2234    }
2235
2236    // ── notify_control_plane_subscriptions_lost ───────────────────────────────
2237
2238    #[tokio::test]
2239    async fn test_notify_cp_subs_lost_sends_unsubscribes() {
2240        let (tx, mut rx) = mpsc::channel::<Result<Message, Status>>(16);
2241        let mut subs = HashMap::new();
2242        let name = Name::from_strings(["org", "default", "svc"]);
2243        subs.insert(name.clone(), HashSet::from([1u64, 2u64]));
2244
2245        MessageProcessor::notify_control_plane_subscriptions_lost(Some(tx), subs, 42).await;
2246
2247        let msg = rx.recv().await.unwrap().unwrap();
2248        assert!(matches!(msg.get_type(), UnsubscribeType(_)));
2249        assert_eq!(msg.get_source(), name.clone());
2250    }
2251
2252    #[tokio::test]
2253    async fn test_notify_cp_subs_lost_no_tx_is_noop() {
2254        let subs = HashMap::from([(
2255            Name::from_strings(["org", "default", "svc"]),
2256            HashSet::from([1u64]),
2257        )]);
2258        // Should not panic or hang.
2259        MessageProcessor::notify_control_plane_subscriptions_lost(None, subs, 1).await;
2260    }
2261
2262    #[tokio::test]
2263    async fn test_notify_cp_subs_lost_empty_subs() {
2264        let (tx, mut rx) = mpsc::channel::<Result<Message, Status>>(16);
2265        MessageProcessor::notify_control_plane_subscriptions_lost(Some(tx), HashMap::new(), 1)
2266            .await;
2267        // No messages should be sent.
2268        assert!(rx.try_recv().is_err());
2269    }
2270
2271    // ── route recovery on link negotiation ────────────────────────────────────
2272
2273    #[tokio::test]
2274    async fn test_link_negotiation_server_triggers_route_recovery() {
2275        let processor = MessageProcessor::new();
2276        let (conn_id, _rx) = make_server_conn(&processor);
2277
2278        let link_id = uuid::Uuid::new_v4().to_string();
2279        let sub_name = Name::from_strings(["org", "default", "recovered"]);
2280
2281        // Pre-populate the recovery table as if a prior connection dropped.
2282        let mut local_subs = HashMap::new();
2283        local_subs.insert(sub_name.clone(), HashSet::from([99u64]));
2284        processor
2285            .internal
2286            .recovery_table
2287            .store(link_id.clone(), local_subs, HashSet::new());
2288
2289        // Simulate the peer reconnecting with the same link_id.
2290        let payload = LinkNegotiationPayload {
2291            link_id: link_id.clone(),
2292            slim_version: "1.0.0".into(),
2293            is_reply: false,
2294        };
2295        processor
2296            .handle_link_negotiation(&payload, conn_id)
2297            .await
2298            .unwrap();
2299
2300        // The subscription should have been restored in the routing table.
2301        let result = processor
2302            .forwarder()
2303            .on_publish_msg_match(sub_name, u64::MAX, 1);
2304        assert!(result.is_ok(), "recovered subscription should be routable");
2305        assert_eq!(result.unwrap(), vec![conn_id]);
2306    }
2307
2308    #[tokio::test]
2309    async fn test_link_negotiation_server_recovery_restores_remote_subs() {
2310        let processor = MessageProcessor::new();
2311        let (conn_id, mut rx) = make_server_conn(&processor);
2312
2313        let link_id = uuid::Uuid::new_v4().to_string();
2314        let source = Name::from_strings(["org", "default", "src"]);
2315        let dest = Name::from_strings(["org", "default", "dst"]);
2316
2317        let remote_sub =
2318            SubscriptionInfo::new(source.clone(), dest.clone(), "identity".into(), conn_id, 42);
2319
2320        // Store recovery entry with remote subscriptions.
2321        processor.internal.recovery_table.store(
2322            link_id.clone(),
2323            HashMap::new(),
2324            HashSet::from([remote_sub]),
2325        );
2326
2327        let payload = LinkNegotiationPayload {
2328            link_id: link_id.clone(),
2329            slim_version: "1.0.0".into(),
2330            is_reply: false,
2331        };
2332        processor
2333            .handle_link_negotiation(&payload, conn_id)
2334            .await
2335            .unwrap();
2336
2337        // The restored subscribe is sent before the link negotiation reply.
2338        let sub_msg = rx.recv().await.unwrap().unwrap();
2339        assert!(matches!(sub_msg.get_type(), SubscribeType(_)));
2340        let reply = rx.recv().await.unwrap().unwrap();
2341        assert!(reply.is_link());
2342    }
2343
2344    #[tokio::test]
2345    async fn test_link_negotiation_server_no_recovery_entry() {
2346        let processor = MessageProcessor::new();
2347        let (conn_id, mut rx) = make_server_conn(&processor);
2348
2349        let link_id = uuid::Uuid::new_v4().to_string();
2350        // No recovery entry stored — normal negotiation, no restoration.
2351        let payload = LinkNegotiationPayload {
2352            link_id: link_id.clone(),
2353            slim_version: "1.0.0".into(),
2354            is_reply: false,
2355        };
2356        processor
2357            .handle_link_negotiation(&payload, conn_id)
2358            .await
2359            .unwrap();
2360
2361        // Only the reply should have been sent.
2362        let reply = rx.try_recv().unwrap().unwrap();
2363        assert!(reply.is_link());
2364        assert!(rx.try_recv().is_err());
2365    }
2366
2367    // ── restore_remote_subscriptions ──────────────────────────────────────────
2368
2369    #[tokio::test]
2370    async fn test_restore_remote_subscriptions_with_tracking() {
2371        let processor = MessageProcessor::new();
2372        let (conn_id, mut rx) = make_server_conn(&processor);
2373
2374        let source = Name::from_strings(["org", "default", "src"]);
2375        let dest = Name::from_strings(["org", "default", "dst"]);
2376        let sub = SubscriptionInfo::new(source.clone(), dest.clone(), "id1".into(), conn_id, 7);
2377        let subs = HashSet::from([sub]);
2378
2379        processor
2380            .restore_remote_subscriptions(&subs, conn_id, true)
2381            .await;
2382
2383        // The subscribe message should have been sent.
2384        let msg = rx.recv().await.unwrap().unwrap();
2385        assert!(matches!(msg.get_type(), SubscribeType(_)));
2386
2387        // With restore_tracking=true, the forwarded subscription should be tracked.
2388        let tracked = processor
2389            .forwarder()
2390            .get_subscriptions_forwarded_on_connection(conn_id);
2391        assert_eq!(tracked.len(), 1);
2392    }
2393
2394    #[tokio::test]
2395    async fn test_restore_remote_subscriptions_without_tracking() {
2396        let processor = MessageProcessor::new();
2397        let (conn_id, mut rx) = make_server_conn(&processor);
2398
2399        let source = Name::from_strings(["org", "default", "src"]);
2400        let dest = Name::from_strings(["org", "default", "dst"]);
2401        let sub = SubscriptionInfo::new(source.clone(), dest.clone(), "id1".into(), conn_id, 7);
2402        let subs = HashSet::from([sub]);
2403
2404        processor
2405            .restore_remote_subscriptions(&subs, conn_id, false)
2406            .await;
2407
2408        // Message sent.
2409        let msg = rx.recv().await.unwrap().unwrap();
2410        assert!(matches!(msg.get_type(), SubscribeType(_)));
2411
2412        // With restore_tracking=false, forwarded subscription table should NOT be updated.
2413        let tracked = processor
2414            .forwarder()
2415            .get_subscriptions_forwarded_on_connection(conn_id);
2416        assert!(tracked.is_empty());
2417    }
2418}