Skip to main content

reifydb_engine/
remote.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	sync::{RwLock, mpsc},
7};
8
9use reifydb_client::GrpcClient;
10use reifydb_runtime::SharedRuntime;
11use reifydb_type::{
12	error::{Diagnostic, Error},
13	params::Params,
14	value::frame::frame::Frame,
15};
16
17pub struct RemoteRegistry {
18	connections: RwLock<HashMap<String, GrpcClient>>,
19	runtime: SharedRuntime,
20}
21
22impl RemoteRegistry {
23	pub fn new(runtime: SharedRuntime) -> Self {
24		Self {
25			connections: RwLock::new(HashMap::new()),
26			runtime,
27		}
28	}
29
30	pub fn forward_query(&self, address: &str, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
31		let client = self.get_or_connect(address)?;
32		let params_opt = match &params {
33			Params::None => None,
34			_ => Some(params),
35		};
36
37		let rql = rql.to_string();
38		let (tx, rx) = mpsc::sync_channel(1);
39
40		self.runtime.spawn(async move {
41			let result = client.query(&rql, params_opt).await.map(|r| r.frames);
42			let _ = tx.send(result);
43		});
44
45		rx.recv()
46			.map_err(|_| {
47				Error(Diagnostic {
48					code: "REMOTE_002".to_string(),
49					message: "remote query channel closed".to_string(),
50					..Default::default()
51				})
52			})?
53			.map_err(Into::into)
54	}
55
56	fn get_or_connect(&self, address: &str) -> Result<GrpcClient, Error> {
57		// Fast path: check read lock
58		{
59			let cache = self.connections.read().unwrap();
60			if let Some(client) = cache.get(address) {
61				return Ok(client.clone());
62			}
63		}
64
65		// Slow path: connect via spawn + channel
66		let address_owned = address.to_string();
67		let (tx, rx) = mpsc::sync_channel(1);
68
69		self.runtime.spawn(async move {
70			let result = GrpcClient::connect(&address_owned).await;
71			let _ = tx.send(result);
72		});
73
74		let mut client = rx.recv().map_err(|_| {
75			Error(Diagnostic {
76				code: "REMOTE_002".to_string(),
77				message: "remote connect channel closed".to_string(),
78				..Default::default()
79			})
80		})??;
81		client.authenticate("service-token");
82		{
83			let mut cache = self.connections.write().unwrap();
84			cache.entry(address.to_string()).or_insert(client.clone());
85		}
86		Ok(client)
87	}
88}
89
90/// Check if an error represents a remote namespace query (REMOTE_001).
91pub fn is_remote_query(err: &Error) -> bool {
92	err.0.code == "REMOTE_001"
93}
94
95/// Extract the remote gRPC address from a REMOTE_001 error diagnostic.
96pub fn extract_remote_address(err: &Error) -> Option<String> {
97	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
98}
99
100#[cfg(test)]
101mod tests {
102	use reifydb_type::{error::Diagnostic, fragment::Fragment};
103
104	use super::*;
105
106	fn make_remote_error(address: &str) -> Error {
107		Error(Diagnostic {
108			code: "REMOTE_001".to_string(),
109			message: format!(
110				"Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
111				address
112			),
113			notes: vec![
114				"Namespace 'remote_ns' is configured as a remote namespace".to_string(),
115				format!("Remote gRPC address: {}", address),
116			],
117			fragment: Fragment::None,
118			..Default::default()
119		})
120	}
121
122	#[test]
123	fn test_is_remote_query_true() {
124		let err = make_remote_error("http://localhost:50051");
125		assert!(is_remote_query(&err));
126	}
127
128	#[test]
129	fn test_is_remote_query_false() {
130		let err = Error(Diagnostic {
131			code: "CATALOG_001".to_string(),
132			message: "Table not found".to_string(),
133			fragment: Fragment::None,
134			..Default::default()
135		});
136		assert!(!is_remote_query(&err));
137	}
138
139	#[test]
140	fn test_extract_remote_address() {
141		let err = make_remote_error("http://localhost:50051");
142		assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
143	}
144
145	#[test]
146	fn test_extract_remote_address_missing() {
147		let err = Error(Diagnostic {
148			code: "REMOTE_001".to_string(),
149			message: "Some error".to_string(),
150			notes: vec![],
151			fragment: Fragment::None,
152			..Default::default()
153		});
154		assert_eq!(extract_remote_address(&err), None);
155	}
156}