Skip to main content

reifydb_engine/
remote.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::mpsc;
5
6use reifydb_client::GrpcClient;
7use reifydb_runtime::SharedRuntime;
8use reifydb_type::{
9	error::{Diagnostic, Error},
10	params::Params,
11	value::frame::frame::Frame,
12};
13
14pub struct RemoteRegistry {
15	runtime: SharedRuntime,
16}
17
18impl RemoteRegistry {
19	pub fn new(runtime: SharedRuntime) -> Self {
20		Self {
21			runtime,
22		}
23	}
24
25	pub fn forward_query(
26		&self,
27		address: &str,
28		rql: &str,
29		params: Params,
30		token: Option<&str>,
31	) -> Result<Vec<Frame>, Error> {
32		let client = self.connect(address, token)?;
33		let params_opt = match &params {
34			Params::None => None,
35			_ => Some(params),
36		};
37
38		let rql = rql.to_string();
39		let (tx, rx) = mpsc::sync_channel(1);
40
41		self.runtime.spawn(async move {
42			let result = client.query(&rql, params_opt).await.map(|r| r.frames);
43			let _ = tx.send(result);
44		});
45
46		rx.recv().map_err(|_| {
47			Error(Box::new(Diagnostic {
48				code: "REMOTE_002".to_string(),
49				message: "remote query channel closed".to_string(),
50				..Default::default()
51			}))
52		})?
53	}
54
55	fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
56		let address_owned = address.to_string();
57		let (tx, rx) = mpsc::sync_channel(1);
58
59		self.runtime.spawn(async move {
60			let result = GrpcClient::connect(&address_owned).await;
61			let _ = tx.send(result);
62		});
63
64		let mut client = rx.recv().map_err(|_| {
65			Error(Box::new(Diagnostic {
66				code: "REMOTE_002".to_string(),
67				message: "remote connect channel closed".to_string(),
68				..Default::default()
69			}))
70		})??;
71		if let Some(token) = token {
72			client.authenticate(token);
73		}
74		Ok(client)
75	}
76}
77
78/// Check if an error represents a remote namespace query (REMOTE_001).
79pub fn is_remote_query(err: &Error) -> bool {
80	err.0.code == "REMOTE_001"
81}
82
83/// Extract the remote gRPC address from a REMOTE_001 error diagnostic.
84pub fn extract_remote_address(err: &Error) -> Option<String> {
85	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
86}
87
88/// Extract the remote service token from a REMOTE_001 error diagnostic.
89pub fn extract_remote_token(err: &Error) -> Option<String> {
90	err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
91}
92
93#[cfg(test)]
94mod tests {
95	use reifydb_type::{error::Diagnostic, fragment::Fragment};
96
97	use super::*;
98
99	fn make_remote_error(address: &str) -> Error {
100		Error(Box::new(Diagnostic {
101			code: "REMOTE_001".to_string(),
102			message: format!(
103				"Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
104				address
105			),
106			notes: vec![
107				"Namespace 'remote_ns' is configured as a remote namespace".to_string(),
108				format!("Remote gRPC address: {}", address),
109			],
110			fragment: Fragment::None,
111			..Default::default()
112		}))
113	}
114
115	#[test]
116	fn test_is_remote_query_true() {
117		let err = make_remote_error("http://localhost:50051");
118		assert!(is_remote_query(&err));
119	}
120
121	#[test]
122	fn test_is_remote_query_false() {
123		let err = Error(Box::new(Diagnostic {
124			code: "CATALOG_001".to_string(),
125			message: "Table not found".to_string(),
126			fragment: Fragment::None,
127			..Default::default()
128		}));
129		assert!(!is_remote_query(&err));
130	}
131
132	#[test]
133	fn test_extract_remote_address() {
134		let err = make_remote_error("http://localhost:50051");
135		assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
136	}
137
138	#[test]
139	fn test_extract_remote_address_missing() {
140		let err = Error(Box::new(Diagnostic {
141			code: "REMOTE_001".to_string(),
142			message: "Some error".to_string(),
143			notes: vec![],
144			fragment: Fragment::None,
145			..Default::default()
146		}));
147		assert_eq!(extract_remote_address(&err), None);
148	}
149
150	#[test]
151	fn test_extract_remote_token() {
152		let err = Error(Box::new(Diagnostic {
153			code: "REMOTE_001".to_string(),
154			message: "Remote namespace".to_string(),
155			notes: vec![
156				"Namespace 'test' is configured as a remote namespace".to_string(),
157				"Remote gRPC address: http://localhost:50051".to_string(),
158				"Remote token: my-secret".to_string(),
159			],
160			fragment: Fragment::None,
161			..Default::default()
162		}));
163		assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
164	}
165
166	#[test]
167	fn test_extract_remote_token_missing() {
168		let err = make_remote_error("http://localhost:50051");
169		assert_eq!(extract_remote_token(&err), None);
170	}
171}