1use std::sync::mpsc;
5
6use reifydb_client::GrpcClient;
7use reifydb_runtime::SharedRuntime;
8use reifydb_type::{
9 error::{Diagnostic, Error},
10 params::Params,
11 value::frame::frame::Frame,
12};
13
14pub struct RemoteRegistry {
15 runtime: SharedRuntime,
16}
17
18impl RemoteRegistry {
19 pub fn new(runtime: SharedRuntime) -> Self {
20 Self {
21 runtime,
22 }
23 }
24
25 pub fn forward_query(
26 &self,
27 address: &str,
28 rql: &str,
29 params: Params,
30 token: Option<&str>,
31 ) -> Result<Vec<Frame>, Error> {
32 let client = self.connect(address, token)?;
33 let params_opt = match ¶ms {
34 Params::None => None,
35 _ => Some(params),
36 };
37
38 let rql = rql.to_string();
39 let (tx, rx) = mpsc::sync_channel(1);
40
41 self.runtime.spawn(async move {
42 let result = client.query(&rql, params_opt).await.map(|r| r.frames);
43 let _ = tx.send(result);
44 });
45
46 rx.recv()
47 .map_err(|_| {
48 Error(Diagnostic {
49 code: "REMOTE_002".to_string(),
50 message: "remote query channel closed".to_string(),
51 ..Default::default()
52 })
53 })?
54 .map_err(Into::into)
55 }
56
57 fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
58 let address_owned = address.to_string();
59 let (tx, rx) = mpsc::sync_channel(1);
60
61 self.runtime.spawn(async move {
62 let result = GrpcClient::connect(&address_owned).await;
63 let _ = tx.send(result);
64 });
65
66 let mut client = rx.recv().map_err(|_| {
67 Error(Diagnostic {
68 code: "REMOTE_002".to_string(),
69 message: "remote connect channel closed".to_string(),
70 ..Default::default()
71 })
72 })??;
73 if let Some(token) = token {
74 client.authenticate(token);
75 }
76 Ok(client)
77 }
78}
79
80pub fn is_remote_query(err: &Error) -> bool {
82 err.0.code == "REMOTE_001"
83}
84
85pub fn extract_remote_address(err: &Error) -> Option<String> {
87 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
88}
89
90pub fn extract_remote_token(err: &Error) -> Option<String> {
92 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
93}
94
95#[cfg(test)]
96mod tests {
97 use reifydb_type::{error::Diagnostic, fragment::Fragment};
98
99 use super::*;
100
101 fn make_remote_error(address: &str) -> Error {
102 Error(Diagnostic {
103 code: "REMOTE_001".to_string(),
104 message: format!(
105 "Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
106 address
107 ),
108 notes: vec![
109 "Namespace 'remote_ns' is configured as a remote namespace".to_string(),
110 format!("Remote gRPC address: {}", address),
111 ],
112 fragment: Fragment::None,
113 ..Default::default()
114 })
115 }
116
117 #[test]
118 fn test_is_remote_query_true() {
119 let err = make_remote_error("http://localhost:50051");
120 assert!(is_remote_query(&err));
121 }
122
123 #[test]
124 fn test_is_remote_query_false() {
125 let err = Error(Diagnostic {
126 code: "CATALOG_001".to_string(),
127 message: "Table not found".to_string(),
128 fragment: Fragment::None,
129 ..Default::default()
130 });
131 assert!(!is_remote_query(&err));
132 }
133
134 #[test]
135 fn test_extract_remote_address() {
136 let err = make_remote_error("http://localhost:50051");
137 assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
138 }
139
140 #[test]
141 fn test_extract_remote_address_missing() {
142 let err = Error(Diagnostic {
143 code: "REMOTE_001".to_string(),
144 message: "Some error".to_string(),
145 notes: vec![],
146 fragment: Fragment::None,
147 ..Default::default()
148 });
149 assert_eq!(extract_remote_address(&err), None);
150 }
151
152 #[test]
153 fn test_extract_remote_token() {
154 let err = Error(Diagnostic {
155 code: "REMOTE_001".to_string(),
156 message: "Remote namespace".to_string(),
157 notes: vec![
158 "Namespace 'test' is configured as a remote namespace".to_string(),
159 "Remote gRPC address: http://localhost:50051".to_string(),
160 "Remote token: my-secret".to_string(),
161 ],
162 fragment: Fragment::None,
163 ..Default::default()
164 });
165 assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
166 }
167
168 #[test]
169 fn test_extract_remote_token_missing() {
170 let err = make_remote_error("http://localhost:50051");
171 assert_eq!(extract_remote_token(&err), None);
172 }
173}