Skip to main content

slim_datapath/sync/
remote.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4//! Remote/controller connection sync.
5//!
6//! [`RemoteSync`] tracks subscriptions forwarded to each remote connection
7//! and handles restore on reconnect. Unlike peer connections (which do a full
8//! sync), remote connections only replay the specific set of subscriptions that
9//! were previously forwarded to them.
10
11use std::collections::{HashMap, HashSet};
12
13use display_error_chain::ErrorChainExt;
14use parking_lot::RwLock;
15use tracing::error;
16
17use crate::api::ProtoName;
18use crate::message_processing::MessageProcessor;
19
20// ─── SubscriptionInfo ────────────────────────────────────────────────────────
21
22#[derive(Hash, Eq, PartialEq, Debug, Clone)]
23pub struct SubscriptionInfo {
24    source: ProtoName,
25    name: ProtoName,
26    source_identity: String,
27    subscription_id: u64,
28}
29
30impl SubscriptionInfo {
31    #[cfg(test)]
32    pub(crate) fn new(
33        source: ProtoName,
34        name: ProtoName,
35        source_identity: String,
36        _conn: u64,
37        subscription_id: u64,
38    ) -> Self {
39        Self {
40            source,
41            name,
42            source_identity,
43            subscription_id,
44        }
45    }
46
47    pub fn source(&self) -> &ProtoName {
48        &self.source
49    }
50
51    pub fn source_identity(&self) -> &String {
52        &self.source_identity
53    }
54
55    pub fn name(&self) -> &ProtoName {
56        &self.name
57    }
58
59    pub fn subscription_id(&self) -> u64 {
60        self.subscription_id
61    }
62}
63
64// ─── RemoteSync ──────────────────────────────────────────────────────────────
65
66/// Manages subscription tracking and restore for remote (non-peer) connections.
67///
68/// Event-driven interface used by the message processor:
69/// - [`on_forwarded_subscription`](Self::on_forwarded_subscription): a sub was sent to a remote conn
70/// - [`on_connection_drop`](Self::on_connection_drop): a remote connection was lost
71/// - [`get_subscriptions_for_reconnect`](Self::get_subscriptions_for_reconnect): snapshot before client reconnect
72/// - [`restore`](Self::restore): replay subscriptions after reconnect
73#[derive(Debug, Default)]
74pub struct RemoteSync {
75    /// Subscriptions forwarded to each remote connection, keyed by conn_id.
76    table: RwLock<HashMap<u64, HashSet<SubscriptionInfo>>>,
77}
78
79impl RemoteSync {
80    /// Record that a subscription was forwarded (or unforwarded) on a remote connection.
81    pub fn on_forwarded_subscription(
82        &self,
83        source: ProtoName,
84        name: ProtoName,
85        source_identity: String,
86        conn: u64,
87        add: bool,
88        subscription_id: u64,
89    ) {
90        let info = SubscriptionInfo {
91            source,
92            name,
93            source_identity,
94            subscription_id,
95        };
96        let mut map = self.table.write();
97        if add {
98            map.entry(conn).or_default().insert(info);
99        } else {
100            match map.get_mut(&conn) {
101                None => {
102                    error!(%conn, "on_forwarded_subscription(remove): connection not found");
103                }
104                Some(set) => {
105                    set.remove(&info);
106                    if set.is_empty() {
107                        map.remove(&conn);
108                    }
109                }
110            }
111        }
112    }
113
114    /// A remote connection dropped — remove and return its tracked subscriptions.
115    pub fn on_connection_drop(&self, conn: u64) -> HashSet<SubscriptionInfo> {
116        let mut map = self.table.write();
117        map.remove(&conn).unwrap_or_default()
118    }
119
120    /// Snapshot the subscriptions forwarded on a connection (for client-side reconnect).
121    /// Unlike `on_connection_drop`, this does NOT remove them from the table.
122    pub fn get_subscriptions_for_reconnect(&self, conn: u64) -> HashSet<SubscriptionInfo> {
123        let map = self.table.read();
124        map.get(&conn).cloned().unwrap_or_default()
125    }
126
127    /// Re-send previously-forwarded subscriptions to a remote connection after reconnect.
128    ///
129    /// When `restore_tracking` is `true`, also re-registers each subscription in the
130    /// tracking table.
131    ///
132    /// When `restore_tracking` is `false` (client-side reconnect), the tracking table was
133    /// never cleaned (reconnect reuses the same slot), so no re-registration is needed.
134    pub async fn restore(
135        &self,
136        mp: &MessageProcessor,
137        remote_subs: &HashSet<SubscriptionInfo>,
138        conn_index: u64,
139        restore_tracking: bool,
140    ) {
141        for r in remote_subs {
142            let sub_msg = crate::api::proto::dataplane::v1::Message::builder()
143                .source(r.source().clone())
144                .destination(r.name().clone())
145                .identity(r.source_identity())
146                .build_subscribe()
147                .unwrap();
148            if let Err(e) = mp.send_msg(sub_msg, conn_index).await {
149                error!(
150                    error = %e.chain(), %conn_index,
151                    "error restoring subscription on remote node",
152                );
153            } else if restore_tracking {
154                self.on_forwarded_subscription(
155                    r.source().clone(),
156                    r.name().clone(),
157                    r.source_identity().clone(),
158                    conn_index,
159                    true,
160                    r.subscription_id(),
161                );
162            }
163        }
164    }
165}