slim_datapath/sync/remote.rs
1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4//! Remote/controller connection sync.
5//!
6//! [`RemoteSync`] tracks subscriptions forwarded to each remote connection
7//! and handles restore on reconnect. Unlike peer connections (which do a full
8//! sync), remote connections only replay the specific set of subscriptions that
9//! were previously forwarded to them.
10
11use std::collections::{HashMap, HashSet};
12
13use display_error_chain::ErrorChainExt;
14use parking_lot::RwLock;
15use tracing::error;
16
17use crate::api::ProtoName;
18use crate::message_processing::MessageProcessor;
19
20// ─── SubscriptionInfo ────────────────────────────────────────────────────────
21
22#[derive(Hash, Eq, PartialEq, Debug, Clone)]
23pub struct SubscriptionInfo {
24 source: ProtoName,
25 name: ProtoName,
26 source_identity: String,
27 subscription_id: u64,
28}
29
30impl SubscriptionInfo {
31 #[cfg(test)]
32 pub(crate) fn new(
33 source: ProtoName,
34 name: ProtoName,
35 source_identity: String,
36 _conn: u64,
37 subscription_id: u64,
38 ) -> Self {
39 Self {
40 source,
41 name,
42 source_identity,
43 subscription_id,
44 }
45 }
46
47 pub fn source(&self) -> &ProtoName {
48 &self.source
49 }
50
51 pub fn source_identity(&self) -> &String {
52 &self.source_identity
53 }
54
55 pub fn name(&self) -> &ProtoName {
56 &self.name
57 }
58
59 pub fn subscription_id(&self) -> u64 {
60 self.subscription_id
61 }
62}
63
64// ─── RemoteSync ──────────────────────────────────────────────────────────────
65
66/// Manages subscription tracking and restore for remote (non-peer) connections.
67///
68/// Event-driven interface used by the message processor:
69/// - [`on_forwarded_subscription`](Self::on_forwarded_subscription): a sub was sent to a remote conn
70/// - [`on_connection_drop`](Self::on_connection_drop): a remote connection was lost
71/// - [`get_subscriptions_for_reconnect`](Self::get_subscriptions_for_reconnect): snapshot before client reconnect
72/// - [`restore`](Self::restore): replay subscriptions after reconnect
73#[derive(Debug, Default)]
74pub struct RemoteSync {
75 /// Subscriptions forwarded to each remote connection, keyed by conn_id.
76 table: RwLock<HashMap<u64, HashSet<SubscriptionInfo>>>,
77}
78
79impl RemoteSync {
80 /// Record that a subscription was forwarded (or unforwarded) on a remote connection.
81 pub fn on_forwarded_subscription(
82 &self,
83 source: ProtoName,
84 name: ProtoName,
85 source_identity: String,
86 conn: u64,
87 add: bool,
88 subscription_id: u64,
89 ) {
90 let info = SubscriptionInfo {
91 source,
92 name,
93 source_identity,
94 subscription_id,
95 };
96 let mut map = self.table.write();
97 if add {
98 map.entry(conn).or_default().insert(info);
99 } else {
100 match map.get_mut(&conn) {
101 None => {
102 error!(%conn, "on_forwarded_subscription(remove): connection not found");
103 }
104 Some(set) => {
105 set.remove(&info);
106 if set.is_empty() {
107 map.remove(&conn);
108 }
109 }
110 }
111 }
112 }
113
114 /// A remote connection dropped — remove and return its tracked subscriptions.
115 pub fn on_connection_drop(&self, conn: u64) -> HashSet<SubscriptionInfo> {
116 let mut map = self.table.write();
117 map.remove(&conn).unwrap_or_default()
118 }
119
120 /// Snapshot the subscriptions forwarded on a connection (for client-side reconnect).
121 /// Unlike `on_connection_drop`, this does NOT remove them from the table.
122 pub fn get_subscriptions_for_reconnect(&self, conn: u64) -> HashSet<SubscriptionInfo> {
123 let map = self.table.read();
124 map.get(&conn).cloned().unwrap_or_default()
125 }
126
127 /// Re-send previously-forwarded subscriptions to a remote connection after reconnect.
128 ///
129 /// When `restore_tracking` is `true`, also re-registers each subscription in the
130 /// tracking table.
131 ///
132 /// When `restore_tracking` is `false` (client-side reconnect), the tracking table was
133 /// never cleaned (reconnect reuses the same slot), so no re-registration is needed.
134 pub async fn restore(
135 &self,
136 mp: &MessageProcessor,
137 remote_subs: &HashSet<SubscriptionInfo>,
138 conn_index: u64,
139 restore_tracking: bool,
140 ) {
141 for r in remote_subs {
142 let sub_msg = crate::api::proto::dataplane::v1::Message::builder()
143 .source(r.source().clone())
144 .destination(r.name().clone())
145 .identity(r.source_identity())
146 .build_subscribe()
147 .unwrap();
148 if let Err(e) = mp.send_msg(sub_msg, conn_index).await {
149 error!(
150 error = %e.chain(), %conn_index,
151 "error restoring subscription on remote node",
152 );
153 } else if restore_tracking {
154 self.on_forwarded_subscription(
155 r.source().clone(),
156 r.name().clone(),
157 r.source_identity().clone(),
158 conn_index,
159 true,
160 r.subscription_id(),
161 );
162 }
163 }
164 }
165}