Skip to main content

reifydb_engine/
remote.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4#[cfg(not(reifydb_single_threaded))]
5use std::sync::mpsc;
6
7#[cfg(not(reifydb_single_threaded))]
8use reifydb_client::GrpcClient;
9#[cfg(not(reifydb_single_threaded))]
10use reifydb_runtime::SharedRuntime;
11#[cfg(not(reifydb_single_threaded))]
12use reifydb_type::error::Diagnostic;
13use reifydb_type::error::Error;
14#[cfg(not(reifydb_single_threaded))]
15use reifydb_type::{params::Params, value::frame::frame::Frame};
16
17#[cfg(not(reifydb_single_threaded))]
18pub struct RemoteRegistry {
19	runtime: SharedRuntime,
20}
21
22#[cfg(not(reifydb_single_threaded))]
23impl RemoteRegistry {
24	pub fn new(runtime: SharedRuntime) -> Self {
25		Self {
26			runtime,
27		}
28	}
29
30	pub fn forward_query(
31		&self,
32		address: &str,
33		rql: &str,
34		params: Params,
35		token: Option<&str>,
36	) -> Result<Vec<Frame>, Error> {
37		let client = self.connect(address, token)?;
38
39		let params_opt = match &params {
40			Params::None => None,
41			_ => Some(params),
42		};
43
44		let rql = rql.to_string();
45		let (tx, rx) = mpsc::sync_channel(1);
46
47		self.runtime.spawn(async move {
48			let result = client.query(&rql, params_opt).await.map(|r| r.frames);
49			let _ = tx.send(result);
50		});
51
52		rx.recv().map_err(|_| {
53			Error(Box::new(Diagnostic {
54				code: "REMOTE_002".to_string(),
55				message: "remote query channel closed".to_string(),
56				..Default::default()
57			}))
58		})?
59	}
60
61	fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
62		let address_owned = address.to_string();
63		let (tx, rx) = mpsc::sync_channel(1);
64
65		self.runtime.spawn(async move {
66			let result = GrpcClient::connect(&address_owned).await;
67			let _ = tx.send(result);
68		});
69
70		let mut client = rx.recv().map_err(|_| {
71			Error(Box::new(Diagnostic {
72				code: "REMOTE_002".to_string(),
73				message: "remote connect channel closed".to_string(),
74				..Default::default()
75			}))
76		})??;
77		if let Some(token) = token {
78			client.authenticate(token);
79		}
80		Ok(client)
81	}
82}
83
84/// Check if an error represents a remote namespace query (REMOTE_001).
85pub fn is_remote_query(err: &Error) -> bool {
86	err.0.code == "REMOTE_001"
87}
88
89/// Extract the remote gRPC address from a REMOTE_001 error diagnostic.
90pub fn extract_remote_address(err: &Error) -> Option<String> {
91	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
92}
93
94/// Extract the remote service token from a REMOTE_001 error diagnostic.
95pub fn extract_remote_token(err: &Error) -> Option<String> {
96	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
97}
98
99#[cfg(test)]
100mod tests {
101	use reifydb_type::{error::Diagnostic, fragment::Fragment};
102
103	use super::*;
104
105	fn make_remote_error(address: &str) -> Error {
106		Error(Box::new(Diagnostic {
107			code: "REMOTE_001".to_string(),
108			message: format!(
109				"Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
110				address
111			),
112			notes: vec![
113				"Namespace 'remote_ns' is configured as a remote namespace".to_string(),
114				format!("Remote gRPC address: {}", address),
115			],
116			fragment: Fragment::None,
117			..Default::default()
118		}))
119	}
120
121	#[test]
122	fn test_is_remote_query_true() {
123		let err = make_remote_error("http://localhost:50051");
124		assert!(is_remote_query(&err));
125	}
126
127	#[test]
128	fn test_is_remote_query_false() {
129		let err = Error(Box::new(Diagnostic {
130			code: "CATALOG_001".to_string(),
131			message: "Table not found".to_string(),
132			fragment: Fragment::None,
133			..Default::default()
134		}));
135		assert!(!is_remote_query(&err));
136	}
137
138	#[test]
139	fn test_extract_remote_address() {
140		let err = make_remote_error("http://localhost:50051");
141		assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
142	}
143
144	#[test]
145	fn test_extract_remote_address_missing() {
146		let err = Error(Box::new(Diagnostic {
147			code: "REMOTE_001".to_string(),
148			message: "Some error".to_string(),
149			notes: vec![],
150			fragment: Fragment::None,
151			..Default::default()
152		}));
153		assert_eq!(extract_remote_address(&err), None);
154	}
155
156	#[test]
157	fn test_extract_remote_token() {
158		let err = Error(Box::new(Diagnostic {
159			code: "REMOTE_001".to_string(),
160			message: "Remote namespace".to_string(),
161			notes: vec![
162				"Namespace 'test' is configured as a remote namespace".to_string(),
163				"Remote gRPC address: http://localhost:50051".to_string(),
164				"Remote token: my-secret".to_string(),
165			],
166			fragment: Fragment::None,
167			..Default::default()
168		}));
169		assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
170	}
171
172	#[test]
173	fn test_extract_remote_token_missing() {
174		let err = make_remote_error("http://localhost:50051");
175		assert_eq!(extract_remote_token(&err), None);
176	}
177}