1#[cfg(not(reifydb_single_threaded))]
5use std::{collections::HashMap, sync::mpsc};
6
7#[cfg(not(reifydb_single_threaded))]
8use reifydb_client::{GrpcClient, WireFormat};
9#[cfg(not(reifydb_single_threaded))]
10use reifydb_runtime::{SharedRuntime, sync::mutex::Mutex};
11#[cfg(not(reifydb_single_threaded))]
12use reifydb_value::error::Diagnostic;
13use reifydb_value::error::Error;
14#[cfg(not(reifydb_single_threaded))]
15use reifydb_value::{params::Params, value::frame::frame::Frame};
16
17#[cfg(not(reifydb_single_threaded))]
18type CacheKey = (String, Option<String>);
19
20#[cfg(not(reifydb_single_threaded))]
21pub struct RemoteRegistry {
22 runtime: SharedRuntime,
23 clients: Mutex<HashMap<CacheKey, GrpcClient>>,
24}
25
26#[cfg(not(reifydb_single_threaded))]
27impl RemoteRegistry {
28 pub fn new(runtime: SharedRuntime) -> Self {
29 Self {
30 runtime,
31 clients: Mutex::new(HashMap::new()),
32 }
33 }
34
35 pub fn forward_query(
36 &self,
37 address: &str,
38 rql: &str,
39 params: Params,
40 token: Option<&str>,
41 ) -> Result<Vec<Frame>, Error> {
42 let params_opt = match ¶ms {
43 Params::None => None,
44 _ => Some(params),
45 };
46
47 let client = self.get_or_connect(address, token)?;
48 match self.run_query(&client, rql, params_opt.clone()) {
49 Ok(frames) => Ok(frames),
50 Err(e) if is_transport_error(&e) => {
51 self.evict(address, token);
52 let client = self.get_or_connect(address, token)?;
53 self.run_query(&client, rql, params_opt)
54 }
55 Err(e) => Err(e),
56 }
57 }
58
59 fn run_query(&self, client: &GrpcClient, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
60 let client = client.clone();
61 let rql = rql.to_string();
62 let (tx, rx) = mpsc::sync_channel(1);
63
64 self.runtime.spawn(async move {
65 let result = client.query(&rql, params).await;
66 let _ = tx.send(result);
67 });
68
69 rx.recv().map_err(|_| {
70 Error(Box::new(Diagnostic {
71 code: "REMOTE_002".to_string(),
72 message: "remote query channel closed".to_string(),
73 ..Default::default()
74 }))
75 })?
76 }
77
78 fn get_or_connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
79 let key = cache_key(address, token);
80 if let Some(c) = self.clients.lock().get(&key) {
81 return Ok(c.clone());
82 }
83 let client = self.connect(address, token)?;
84 self.clients.lock().entry(key).or_insert_with(|| client.clone());
85 Ok(client)
86 }
87
88 fn evict(&self, address: &str, token: Option<&str>) {
89 self.clients.lock().remove(&cache_key(address, token));
90 }
91
92 #[cfg(test)]
93 fn cache_len(&self) -> usize {
94 self.clients.lock().len()
95 }
96
97 fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
98 let address_owned = address.to_string();
99 let (tx, rx) = mpsc::sync_channel(1);
100
101 self.runtime.spawn(async move {
102 let result = GrpcClient::connect(&address_owned, WireFormat::Proto).await;
103 let _ = tx.send(result);
104 });
105
106 let mut client = rx.recv().map_err(|_| {
107 Error(Box::new(Diagnostic {
108 code: "REMOTE_002".to_string(),
109 message: "remote connect channel closed".to_string(),
110 ..Default::default()
111 }))
112 })??;
113 if let Some(token) = token {
114 client.authenticate(token);
115 }
116 Ok(client)
117 }
118}
119
120#[cfg(not(reifydb_single_threaded))]
121fn cache_key(address: &str, token: Option<&str>) -> CacheKey {
122 (address.to_string(), token.map(str::to_string))
123}
124
125#[cfg(not(reifydb_single_threaded))]
126fn is_transport_error(err: &Error) -> bool {
127 err.0.code.starts_with("GRPC_")
128}
129
130pub fn is_remote_query(err: &Error) -> bool {
131 err.0.code == "REMOTE_001"
132}
133
134pub fn extract_remote_address(err: &Error) -> Option<String> {
135 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
136}
137
138pub fn extract_remote_token(err: &Error) -> Option<String> {
139 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
140}
141
142#[cfg(test)]
143mod tests {
144 use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig, pool::PoolConfig};
145 use reifydb_value::{error::Diagnostic, fragment::Fragment};
146
147 use super::*;
148
149 fn make_remote_error(address: &str) -> Error {
150 Error(Box::new(Diagnostic {
151 code: "REMOTE_001".to_string(),
152 message: format!(
153 "Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
154 address
155 ),
156 notes: vec![
157 "Namespace 'remote_ns' is configured as a remote namespace".to_string(),
158 format!("Remote gRPC address: {}", address),
159 ],
160 fragment: Fragment::None,
161 ..Default::default()
162 }))
163 }
164
165 #[test]
166 fn test_is_remote_query_true() {
167 let err = make_remote_error("http://localhost:50051");
168 assert!(is_remote_query(&err));
169 }
170
171 #[test]
172 fn test_is_remote_query_false() {
173 let err = Error(Box::new(Diagnostic {
174 code: "CATALOG_001".to_string(),
175 message: "Table not found".to_string(),
176 fragment: Fragment::None,
177 ..Default::default()
178 }));
179 assert!(!is_remote_query(&err));
180 }
181
182 #[test]
183 fn test_extract_remote_address() {
184 let err = make_remote_error("http://localhost:50051");
185 assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
186 }
187
188 #[test]
189 fn test_extract_remote_address_missing() {
190 let err = Error(Box::new(Diagnostic {
191 code: "REMOTE_001".to_string(),
192 message: "Some error".to_string(),
193 notes: vec![],
194 fragment: Fragment::None,
195 ..Default::default()
196 }));
197 assert_eq!(extract_remote_address(&err), None);
198 }
199
200 #[test]
201 fn test_extract_remote_token() {
202 let err = Error(Box::new(Diagnostic {
203 code: "REMOTE_001".to_string(),
204 message: "Remote namespace".to_string(),
205 notes: vec![
206 "Namespace 'test' is configured as a remote namespace".to_string(),
207 "Remote gRPC address: http://localhost:50051".to_string(),
208 "Remote token: my-secret".to_string(),
209 ],
210 fragment: Fragment::None,
211 ..Default::default()
212 }));
213 assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
214 }
215
216 #[test]
217 fn test_extract_remote_token_missing() {
218 let err = make_remote_error("http://localhost:50051");
219 assert_eq!(extract_remote_token(&err), None);
220 }
221
222 #[test]
223 fn test_is_transport_error() {
224 let grpc_err = Error(Box::new(Diagnostic {
225 code: "GRPC_Unavailable".to_string(),
226 message: "channel closed".to_string(),
227 ..Default::default()
228 }));
229 assert!(is_transport_error(&grpc_err));
230
231 let app_err = Error(Box::new(Diagnostic {
232 code: "CATALOG_001".to_string(),
233 message: "Table not found".to_string(),
234 ..Default::default()
235 }));
236 assert!(!is_transport_error(&app_err));
237 }
238
239 #[test]
240 fn test_cache_key_distinguishes_tokens() {
241 assert_ne!(cache_key("addr", Some("a")), cache_key("addr", Some("b")));
242 assert_ne!(cache_key("addr", None), cache_key("addr", Some("a")));
243 assert_eq!(cache_key("addr", Some("a")), cache_key("addr", Some("a")));
244 }
245
246 #[test]
247 fn test_connect_failure_does_not_pollute_cache() {
248 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default(), PoolConfig::default());
249 let registry = RemoteRegistry::new(runtime);
250
251 let err = registry.forward_query("http://127.0.0.1:1", "FROM x", Params::None, None).unwrap_err();
253 assert!(err.0.code.starts_with("GRPC_") || err.0.code == "REMOTE_002");
254 assert_eq!(registry.cache_len(), 0);
255 }
256
257 #[test]
258 fn test_evict_missing_key_is_noop() {
259 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default(), PoolConfig::default());
260 let registry = RemoteRegistry::new(runtime);
261 registry.evict("http://127.0.0.1:1", None);
262 registry.evict("http://127.0.0.1:1", Some("tok"));
263 assert_eq!(registry.cache_len(), 0);
264 }
265}