1#[cfg(not(reifydb_single_threaded))]
5use std::sync::mpsc;
6
7#[cfg(not(reifydb_single_threaded))]
8use reifydb_client::GrpcClient;
9#[cfg(not(reifydb_single_threaded))]
10use reifydb_runtime::SharedRuntime;
11#[cfg(not(reifydb_single_threaded))]
12use reifydb_type::error::Diagnostic;
13use reifydb_type::error::Error;
14#[cfg(not(reifydb_single_threaded))]
15use reifydb_type::{params::Params, value::frame::frame::Frame};
16
17#[cfg(not(reifydb_single_threaded))]
18pub struct RemoteRegistry {
19 runtime: SharedRuntime,
20}
21
22#[cfg(not(reifydb_single_threaded))]
23impl RemoteRegistry {
24 pub fn new(runtime: SharedRuntime) -> Self {
25 Self {
26 runtime,
27 }
28 }
29
30 pub fn forward_query(
31 &self,
32 address: &str,
33 rql: &str,
34 params: Params,
35 token: Option<&str>,
36 ) -> Result<Vec<Frame>, Error> {
37 let client = self.connect(address, token)?;
38
39 let params_opt = match ¶ms {
40 Params::None => None,
41 _ => Some(params),
42 };
43
44 let rql = rql.to_string();
45 let (tx, rx) = mpsc::sync_channel(1);
46
47 self.runtime.spawn(async move {
48 let result = client.query(&rql, params_opt).await.map(|r| r.frames);
49 let _ = tx.send(result);
50 });
51
52 rx.recv().map_err(|_| {
53 Error(Box::new(Diagnostic {
54 code: "REMOTE_002".to_string(),
55 message: "remote query channel closed".to_string(),
56 ..Default::default()
57 }))
58 })?
59 }
60
61 fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
62 let address_owned = address.to_string();
63 let (tx, rx) = mpsc::sync_channel(1);
64
65 self.runtime.spawn(async move {
66 let result = GrpcClient::connect(&address_owned).await;
67 let _ = tx.send(result);
68 });
69
70 let mut client = rx.recv().map_err(|_| {
71 Error(Box::new(Diagnostic {
72 code: "REMOTE_002".to_string(),
73 message: "remote connect channel closed".to_string(),
74 ..Default::default()
75 }))
76 })??;
77 if let Some(token) = token {
78 client.authenticate(token);
79 }
80 Ok(client)
81 }
82}
83
84pub fn is_remote_query(err: &Error) -> bool {
86 err.0.code == "REMOTE_001"
87}
88
89pub fn extract_remote_address(err: &Error) -> Option<String> {
91 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
92}
93
94pub fn extract_remote_token(err: &Error) -> Option<String> {
96 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
97}
98
99#[cfg(test)]
100mod tests {
101 use reifydb_type::{error::Diagnostic, fragment::Fragment};
102
103 use super::*;
104
105 fn make_remote_error(address: &str) -> Error {
106 Error(Box::new(Diagnostic {
107 code: "REMOTE_001".to_string(),
108 message: format!(
109 "Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
110 address
111 ),
112 notes: vec![
113 "Namespace 'remote_ns' is configured as a remote namespace".to_string(),
114 format!("Remote gRPC address: {}", address),
115 ],
116 fragment: Fragment::None,
117 ..Default::default()
118 }))
119 }
120
121 #[test]
122 fn test_is_remote_query_true() {
123 let err = make_remote_error("http://localhost:50051");
124 assert!(is_remote_query(&err));
125 }
126
127 #[test]
128 fn test_is_remote_query_false() {
129 let err = Error(Box::new(Diagnostic {
130 code: "CATALOG_001".to_string(),
131 message: "Table not found".to_string(),
132 fragment: Fragment::None,
133 ..Default::default()
134 }));
135 assert!(!is_remote_query(&err));
136 }
137
138 #[test]
139 fn test_extract_remote_address() {
140 let err = make_remote_error("http://localhost:50051");
141 assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
142 }
143
144 #[test]
145 fn test_extract_remote_address_missing() {
146 let err = Error(Box::new(Diagnostic {
147 code: "REMOTE_001".to_string(),
148 message: "Some error".to_string(),
149 notes: vec![],
150 fragment: Fragment::None,
151 ..Default::default()
152 }));
153 assert_eq!(extract_remote_address(&err), None);
154 }
155
156 #[test]
157 fn test_extract_remote_token() {
158 let err = Error(Box::new(Diagnostic {
159 code: "REMOTE_001".to_string(),
160 message: "Remote namespace".to_string(),
161 notes: vec![
162 "Namespace 'test' is configured as a remote namespace".to_string(),
163 "Remote gRPC address: http://localhost:50051".to_string(),
164 "Remote token: my-secret".to_string(),
165 ],
166 fragment: Fragment::None,
167 ..Default::default()
168 }));
169 assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
170 }
171
172 #[test]
173 fn test_extract_remote_token_missing() {
174 let err = make_remote_error("http://localhost:50051");
175 assert_eq!(extract_remote_token(&err), None);
176 }
177}