Skip to main content

reifydb_engine/
remote.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4#[cfg(not(reifydb_single_threaded))]
5use std::{collections::HashMap, sync::mpsc};
6
7#[cfg(not(reifydb_single_threaded))]
8use reifydb_client::{GrpcClient, WireFormat};
9#[cfg(not(reifydb_single_threaded))]
10use reifydb_runtime::{SharedRuntime, sync::mutex::Mutex};
11#[cfg(not(reifydb_single_threaded))]
12use reifydb_value::error::Diagnostic;
13use reifydb_value::error::Error;
14#[cfg(not(reifydb_single_threaded))]
15use reifydb_value::{params::Params, value::frame::frame::Frame};
16
17#[cfg(not(reifydb_single_threaded))]
18type CacheKey = (String, Option<String>);
19
20#[cfg(not(reifydb_single_threaded))]
21pub struct RemoteRegistry {
22	runtime: SharedRuntime,
23	clients: Mutex<HashMap<CacheKey, GrpcClient>>,
24}
25
26#[cfg(not(reifydb_single_threaded))]
27impl RemoteRegistry {
28	pub fn new(runtime: SharedRuntime) -> Self {
29		Self {
30			runtime,
31			clients: Mutex::new(HashMap::new()),
32		}
33	}
34
35	pub fn forward_query(
36		&self,
37		address: &str,
38		rql: &str,
39		params: Params,
40		token: Option<&str>,
41	) -> Result<Vec<Frame>, Error> {
42		let params_opt = match &params {
43			Params::None => None,
44			_ => Some(params),
45		};
46
47		let client = self.get_or_connect(address, token)?;
48		match self.run_query(&client, rql, params_opt.clone()) {
49			Ok(frames) => Ok(frames),
50			Err(e) if is_transport_error(&e) => {
51				self.evict(address, token);
52				let client = self.get_or_connect(address, token)?;
53				self.run_query(&client, rql, params_opt)
54			}
55			Err(e) => Err(e),
56		}
57	}
58
59	fn run_query(&self, client: &GrpcClient, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
60		let client = client.clone();
61		let rql = rql.to_string();
62		let (tx, rx) = mpsc::sync_channel(1);
63
64		self.runtime.spawn(async move {
65			let result = client.query(&rql, params).await;
66			let _ = tx.send(result);
67		});
68
69		rx.recv().map_err(|_| {
70			Error(Box::new(Diagnostic {
71				code: "REMOTE_002".to_string(),
72				message: "remote query channel closed".to_string(),
73				..Default::default()
74			}))
75		})?
76	}
77
78	fn get_or_connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
79		let key = cache_key(address, token);
80		if let Some(c) = self.clients.lock().get(&key) {
81			return Ok(c.clone());
82		}
83		let client = self.connect(address, token)?;
84		self.clients.lock().entry(key).or_insert_with(|| client.clone());
85		Ok(client)
86	}
87
88	fn evict(&self, address: &str, token: Option<&str>) {
89		self.clients.lock().remove(&cache_key(address, token));
90	}
91
92	#[cfg(test)]
93	fn cache_len(&self) -> usize {
94		self.clients.lock().len()
95	}
96
97	fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
98		let address_owned = address.to_string();
99		let (tx, rx) = mpsc::sync_channel(1);
100
101		self.runtime.spawn(async move {
102			let result = GrpcClient::connect(&address_owned, WireFormat::Proto).await;
103			let _ = tx.send(result);
104		});
105
106		let mut client = rx.recv().map_err(|_| {
107			Error(Box::new(Diagnostic {
108				code: "REMOTE_002".to_string(),
109				message: "remote connect channel closed".to_string(),
110				..Default::default()
111			}))
112		})??;
113		if let Some(token) = token {
114			client.authenticate(token);
115		}
116		Ok(client)
117	}
118}
119
120#[cfg(not(reifydb_single_threaded))]
121fn cache_key(address: &str, token: Option<&str>) -> CacheKey {
122	(address.to_string(), token.map(str::to_string))
123}
124
125#[cfg(not(reifydb_single_threaded))]
126fn is_transport_error(err: &Error) -> bool {
127	err.0.code.starts_with("GRPC_")
128}
129
130pub fn is_remote_query(err: &Error) -> bool {
131	err.0.code == "REMOTE_001"
132}
133
134pub fn extract_remote_address(err: &Error) -> Option<String> {
135	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
136}
137
138pub fn extract_remote_token(err: &Error) -> Option<String> {
139	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
140}
141
142#[cfg(test)]
143mod tests {
144	use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig, pool::PoolConfig};
145	use reifydb_value::{error::Diagnostic, fragment::Fragment};
146
147	use super::*;
148
149	fn make_remote_error(address: &str) -> Error {
150		Error(Box::new(Diagnostic {
151			code: "REMOTE_001".to_string(),
152			message: format!(
153				"Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
154				address
155			),
156			notes: vec![
157				"Namespace 'remote_ns' is configured as a remote namespace".to_string(),
158				format!("Remote gRPC address: {}", address),
159			],
160			fragment: Fragment::None,
161			..Default::default()
162		}))
163	}
164
165	#[test]
166	fn test_is_remote_query_true() {
167		let err = make_remote_error("http://localhost:50051");
168		assert!(is_remote_query(&err));
169	}
170
171	#[test]
172	fn test_is_remote_query_false() {
173		let err = Error(Box::new(Diagnostic {
174			code: "CATALOG_001".to_string(),
175			message: "Table not found".to_string(),
176			fragment: Fragment::None,
177			..Default::default()
178		}));
179		assert!(!is_remote_query(&err));
180	}
181
182	#[test]
183	fn test_extract_remote_address() {
184		let err = make_remote_error("http://localhost:50051");
185		assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
186	}
187
188	#[test]
189	fn test_extract_remote_address_missing() {
190		let err = Error(Box::new(Diagnostic {
191			code: "REMOTE_001".to_string(),
192			message: "Some error".to_string(),
193			notes: vec![],
194			fragment: Fragment::None,
195			..Default::default()
196		}));
197		assert_eq!(extract_remote_address(&err), None);
198	}
199
200	#[test]
201	fn test_extract_remote_token() {
202		let err = Error(Box::new(Diagnostic {
203			code: "REMOTE_001".to_string(),
204			message: "Remote namespace".to_string(),
205			notes: vec![
206				"Namespace 'test' is configured as a remote namespace".to_string(),
207				"Remote gRPC address: http://localhost:50051".to_string(),
208				"Remote token: my-secret".to_string(),
209			],
210			fragment: Fragment::None,
211			..Default::default()
212		}));
213		assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
214	}
215
216	#[test]
217	fn test_extract_remote_token_missing() {
218		let err = make_remote_error("http://localhost:50051");
219		assert_eq!(extract_remote_token(&err), None);
220	}
221
222	#[test]
223	fn test_is_transport_error() {
224		let grpc_err = Error(Box::new(Diagnostic {
225			code: "GRPC_Unavailable".to_string(),
226			message: "channel closed".to_string(),
227			..Default::default()
228		}));
229		assert!(is_transport_error(&grpc_err));
230
231		let app_err = Error(Box::new(Diagnostic {
232			code: "CATALOG_001".to_string(),
233			message: "Table not found".to_string(),
234			..Default::default()
235		}));
236		assert!(!is_transport_error(&app_err));
237	}
238
239	#[test]
240	fn test_cache_key_distinguishes_tokens() {
241		assert_ne!(cache_key("addr", Some("a")), cache_key("addr", Some("b")));
242		assert_ne!(cache_key("addr", None), cache_key("addr", Some("a")));
243		assert_eq!(cache_key("addr", Some("a")), cache_key("addr", Some("a")));
244	}
245
246	#[test]
247	fn test_connect_failure_does_not_pollute_cache() {
248		let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default(), PoolConfig::default());
249		let registry = RemoteRegistry::new(runtime);
250
251		// 127.0.0.1:1 is reserved; connect must fail fast.
252		let err = registry.forward_query("http://127.0.0.1:1", "FROM x", Params::None, None).unwrap_err();
253		assert!(err.0.code.starts_with("GRPC_") || err.0.code == "REMOTE_002");
254		assert_eq!(registry.cache_len(), 0);
255	}
256
257	#[test]
258	fn test_evict_missing_key_is_noop() {
259		let runtime = SharedRuntime::from_config(SharedRuntimeConfig::default(), PoolConfig::default());
260		let registry = RemoteRegistry::new(runtime);
261		registry.evict("http://127.0.0.1:1", None);
262		registry.evict("http://127.0.0.1:1", Some("tok"));
263		assert_eq!(registry.cache_len(), 0);
264	}
265}