Skip to main content

reifydb_engine/
remote.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::mpsc;
5
6use reifydb_client::GrpcClient;
7use reifydb_runtime::SharedRuntime;
8use reifydb_type::{
9	error::{Diagnostic, Error},
10	params::Params,
11	value::frame::frame::Frame,
12};
13
14pub struct RemoteRegistry {
15	runtime: SharedRuntime,
16}
17
18impl RemoteRegistry {
19	pub fn new(runtime: SharedRuntime) -> Self {
20		Self {
21			runtime,
22		}
23	}
24
25	pub fn forward_query(
26		&self,
27		address: &str,
28		rql: &str,
29		params: Params,
30		token: Option<&str>,
31	) -> Result<Vec<Frame>, Error> {
32		let client = self.connect(address, token)?;
33		let params_opt = match &params {
34			Params::None => None,
35			_ => Some(params),
36		};
37
38		let rql = rql.to_string();
39		let (tx, rx) = mpsc::sync_channel(1);
40
41		self.runtime.spawn(async move {
42			let result = client.query(&rql, params_opt).await.map(|r| r.frames);
43			let _ = tx.send(result);
44		});
45
46		rx.recv()
47			.map_err(|_| {
48				Error(Diagnostic {
49					code: "REMOTE_002".to_string(),
50					message: "remote query channel closed".to_string(),
51					..Default::default()
52				})
53			})?
54			.map_err(Into::into)
55	}
56
57	fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
58		let address_owned = address.to_string();
59		let (tx, rx) = mpsc::sync_channel(1);
60
61		self.runtime.spawn(async move {
62			let result = GrpcClient::connect(&address_owned).await;
63			let _ = tx.send(result);
64		});
65
66		let mut client = rx.recv().map_err(|_| {
67			Error(Diagnostic {
68				code: "REMOTE_002".to_string(),
69				message: "remote connect channel closed".to_string(),
70				..Default::default()
71			})
72		})??;
73		if let Some(token) = token {
74			client.authenticate(token);
75		}
76		Ok(client)
77	}
78}
79
80/// Check if an error represents a remote namespace query (REMOTE_001).
81pub fn is_remote_query(err: &Error) -> bool {
82	err.0.code == "REMOTE_001"
83}
84
85/// Extract the remote gRPC address from a REMOTE_001 error diagnostic.
86pub fn extract_remote_address(err: &Error) -> Option<String> {
87	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
88}
89
90/// Extract the remote service token from a REMOTE_001 error diagnostic.
91pub fn extract_remote_token(err: &Error) -> Option<String> {
92	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
93}
94
95#[cfg(test)]
96mod tests {
97	use reifydb_type::{error::Diagnostic, fragment::Fragment};
98
99	use super::*;
100
101	fn make_remote_error(address: &str) -> Error {
102		Error(Diagnostic {
103			code: "REMOTE_001".to_string(),
104			message: format!(
105				"Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
106				address
107			),
108			notes: vec![
109				"Namespace 'remote_ns' is configured as a remote namespace".to_string(),
110				format!("Remote gRPC address: {}", address),
111			],
112			fragment: Fragment::None,
113			..Default::default()
114		})
115	}
116
117	#[test]
118	fn test_is_remote_query_true() {
119		let err = make_remote_error("http://localhost:50051");
120		assert!(is_remote_query(&err));
121	}
122
123	#[test]
124	fn test_is_remote_query_false() {
125		let err = Error(Diagnostic {
126			code: "CATALOG_001".to_string(),
127			message: "Table not found".to_string(),
128			fragment: Fragment::None,
129			..Default::default()
130		});
131		assert!(!is_remote_query(&err));
132	}
133
134	#[test]
135	fn test_extract_remote_address() {
136		let err = make_remote_error("http://localhost:50051");
137		assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
138	}
139
140	#[test]
141	fn test_extract_remote_address_missing() {
142		let err = Error(Diagnostic {
143			code: "REMOTE_001".to_string(),
144			message: "Some error".to_string(),
145			notes: vec![],
146			fragment: Fragment::None,
147			..Default::default()
148		});
149		assert_eq!(extract_remote_address(&err), None);
150	}
151
152	#[test]
153	fn test_extract_remote_token() {
154		let err = Error(Diagnostic {
155			code: "REMOTE_001".to_string(),
156			message: "Remote namespace".to_string(),
157			notes: vec![
158				"Namespace 'test' is configured as a remote namespace".to_string(),
159				"Remote gRPC address: http://localhost:50051".to_string(),
160				"Remote token: my-secret".to_string(),
161			],
162			fragment: Fragment::None,
163			..Default::default()
164		});
165		assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
166	}
167
168	#[test]
169	fn test_extract_remote_token_missing() {
170		let err = make_remote_error("http://localhost:50051");
171		assert_eq!(extract_remote_token(&err), None);
172	}
173}