1#[cfg(not(reifydb_single_threaded))]
5use std::{
6 collections::HashMap,
7 sync::{Mutex, mpsc},
8};
9
10#[cfg(not(reifydb_single_threaded))]
11use reifydb_client::{GrpcClient, WireFormat};
12#[cfg(not(reifydb_single_threaded))]
13use reifydb_runtime::SharedRuntime;
14#[cfg(not(reifydb_single_threaded))]
15use reifydb_type::error::Diagnostic;
16use reifydb_type::error::Error;
17#[cfg(not(reifydb_single_threaded))]
18use reifydb_type::{params::Params, value::frame::frame::Frame};
19
20#[cfg(not(reifydb_single_threaded))]
21type CacheKey = (String, Option<String>);
22
23#[cfg(not(reifydb_single_threaded))]
24pub struct RemoteRegistry {
25 runtime: SharedRuntime,
26 clients: Mutex<HashMap<CacheKey, GrpcClient>>,
27}
28
29#[cfg(not(reifydb_single_threaded))]
30impl RemoteRegistry {
31 pub fn new(runtime: SharedRuntime) -> Self {
32 Self {
33 runtime,
34 clients: Mutex::new(HashMap::new()),
35 }
36 }
37
38 pub fn forward_query(
39 &self,
40 address: &str,
41 rql: &str,
42 params: Params,
43 token: Option<&str>,
44 ) -> Result<Vec<Frame>, Error> {
45 let params_opt = match ¶ms {
46 Params::None => None,
47 _ => Some(params),
48 };
49
50 let client = self.get_or_connect(address, token)?;
51 match self.run_query(&client, rql, params_opt.clone()) {
52 Ok(frames) => Ok(frames),
53 Err(e) if is_transport_error(&e) => {
54 self.evict(address, token);
55 let client = self.get_or_connect(address, token)?;
56 self.run_query(&client, rql, params_opt)
57 }
58 Err(e) => Err(e),
59 }
60 }
61
62 fn run_query(&self, client: &GrpcClient, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
63 let client = client.clone();
64 let rql = rql.to_string();
65 let (tx, rx) = mpsc::sync_channel(1);
66
67 self.runtime.spawn(async move {
68 let result = client.query(&rql, params).await;
69 let _ = tx.send(result);
70 });
71
72 rx.recv().map_err(|_| {
73 Error(Box::new(Diagnostic {
74 code: "REMOTE_002".to_string(),
75 message: "remote query channel closed".to_string(),
76 ..Default::default()
77 }))
78 })?
79 }
80
81 fn get_or_connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
82 let key = cache_key(address, token);
83 if let Some(c) = self.clients.lock().unwrap().get(&key) {
84 return Ok(c.clone());
85 }
86 let client = self.connect(address, token)?;
87 self.clients.lock().unwrap().entry(key).or_insert_with(|| client.clone());
88 Ok(client)
89 }
90
91 fn evict(&self, address: &str, token: Option<&str>) {
92 self.clients.lock().unwrap().remove(&cache_key(address, token));
93 }
94
95 #[cfg(test)]
96 fn cache_len(&self) -> usize {
97 self.clients.lock().unwrap().len()
98 }
99
100 fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
101 let address_owned = address.to_string();
102 let (tx, rx) = mpsc::sync_channel(1);
103
104 self.runtime.spawn(async move {
105 let result = GrpcClient::connect(&address_owned, WireFormat::Proto).await;
106 let _ = tx.send(result);
107 });
108
109 let mut client = rx.recv().map_err(|_| {
110 Error(Box::new(Diagnostic {
111 code: "REMOTE_002".to_string(),
112 message: "remote connect channel closed".to_string(),
113 ..Default::default()
114 }))
115 })??;
116 if let Some(token) = token {
117 client.authenticate(token);
118 }
119 Ok(client)
120 }
121}
122
123#[cfg(not(reifydb_single_threaded))]
124fn cache_key(address: &str, token: Option<&str>) -> CacheKey {
125 (address.to_string(), token.map(str::to_string))
126}
127
128#[cfg(not(reifydb_single_threaded))]
132fn is_transport_error(err: &Error) -> bool {
133 err.0.code.starts_with("GRPC_")
134}
135
136pub fn is_remote_query(err: &Error) -> bool {
138 err.0.code == "REMOTE_001"
139}
140
141pub fn extract_remote_address(err: &Error) -> Option<String> {
143 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
144}
145
146pub fn extract_remote_token(err: &Error) -> Option<String> {
148 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
149}
150
151#[cfg(test)]
152mod tests {
153 use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
154 use reifydb_type::{error::Diagnostic, fragment::Fragment};
155
156 use super::*;
157
158 fn make_remote_error(address: &str) -> Error {
159 Error(Box::new(Diagnostic {
160 code: "REMOTE_001".to_string(),
161 message: format!(
162 "Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
163 address
164 ),
165 notes: vec![
166 "Namespace 'remote_ns' is configured as a remote namespace".to_string(),
167 format!("Remote gRPC address: {}", address),
168 ],
169 fragment: Fragment::None,
170 ..Default::default()
171 }))
172 }
173
174 #[test]
175 fn test_is_remote_query_true() {
176 let err = make_remote_error("http://localhost:50051");
177 assert!(is_remote_query(&err));
178 }
179
180 #[test]
181 fn test_is_remote_query_false() {
182 let err = Error(Box::new(Diagnostic {
183 code: "CATALOG_001".to_string(),
184 message: "Table not found".to_string(),
185 fragment: Fragment::None,
186 ..Default::default()
187 }));
188 assert!(!is_remote_query(&err));
189 }
190
191 #[test]
192 fn test_extract_remote_address() {
193 let err = make_remote_error("http://localhost:50051");
194 assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
195 }
196
197 #[test]
198 fn test_extract_remote_address_missing() {
199 let err = Error(Box::new(Diagnostic {
200 code: "REMOTE_001".to_string(),
201 message: "Some error".to_string(),
202 notes: vec![],
203 fragment: Fragment::None,
204 ..Default::default()
205 }));
206 assert_eq!(extract_remote_address(&err), None);
207 }
208
209 #[test]
210 fn test_extract_remote_token() {
211 let err = Error(Box::new(Diagnostic {
212 code: "REMOTE_001".to_string(),
213 message: "Remote namespace".to_string(),
214 notes: vec![
215 "Namespace 'test' is configured as a remote namespace".to_string(),
216 "Remote gRPC address: http://localhost:50051".to_string(),
217 "Remote token: my-secret".to_string(),
218 ],
219 fragment: Fragment::None,
220 ..Default::default()
221 }));
222 assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
223 }
224
225 #[test]
226 fn test_extract_remote_token_missing() {
227 let err = make_remote_error("http://localhost:50051");
228 assert_eq!(extract_remote_token(&err), None);
229 }
230
231 #[test]
232 fn test_is_transport_error() {
233 let grpc_err = Error(Box::new(Diagnostic {
234 code: "GRPC_Unavailable".to_string(),
235 message: "channel closed".to_string(),
236 ..Default::default()
237 }));
238 assert!(is_transport_error(&grpc_err));
239
240 let app_err = Error(Box::new(Diagnostic {
241 code: "CATALOG_001".to_string(),
242 message: "Table not found".to_string(),
243 ..Default::default()
244 }));
245 assert!(!is_transport_error(&app_err));
246 }
247
248 #[test]
249 fn test_cache_key_distinguishes_tokens() {
250 assert_ne!(cache_key("addr", Some("a")), cache_key("addr", Some("b")));
251 assert_ne!(cache_key("addr", None), cache_key("addr", Some("a")));
252 assert_eq!(cache_key("addr", Some("a")), cache_key("addr", Some("a")));
253 }
254
255 #[test]
256 fn test_connect_failure_does_not_pollute_cache() {
257 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
258 let registry = RemoteRegistry::new(runtime);
259
260 let err = registry.forward_query("http://127.0.0.1:1", "FROM x", Params::None, None).unwrap_err();
262 assert!(err.0.code.starts_with("GRPC_") || err.0.code == "REMOTE_002");
263 assert_eq!(registry.cache_len(), 0);
264 }
265
266 #[test]
267 fn test_evict_missing_key_is_noop() {
268 let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default());
269 let registry = RemoteRegistry::new(runtime);
270 registry.evict("http://127.0.0.1:1", None);
271 registry.evict("http://127.0.0.1:1", Some("tok"));
272 assert_eq!(registry.cache_len(), 0);
273 }
274}