1use std::sync::mpsc;
5
6use reifydb_client::GrpcClient;
7use reifydb_runtime::SharedRuntime;
8use reifydb_type::{
9 error::{Diagnostic, Error},
10 params::Params,
11 value::frame::frame::Frame,
12};
13
14pub struct RemoteRegistry {
15 runtime: SharedRuntime,
16}
17
18impl RemoteRegistry {
19 pub fn new(runtime: SharedRuntime) -> Self {
20 Self {
21 runtime,
22 }
23 }
24
25 pub fn forward_query(
26 &self,
27 address: &str,
28 rql: &str,
29 params: Params,
30 token: Option<&str>,
31 ) -> Result<Vec<Frame>, Error> {
32 let client = self.connect(address, token)?;
33 let params_opt = match ¶ms {
34 Params::None => None,
35 _ => Some(params),
36 };
37
38 let rql = rql.to_string();
39 let (tx, rx) = mpsc::sync_channel(1);
40
41 self.runtime.spawn(async move {
42 let result = client.query(&rql, params_opt).await.map(|r| r.frames);
43 let _ = tx.send(result);
44 });
45
46 rx.recv().map_err(|_| {
47 Error(Box::new(Diagnostic {
48 code: "REMOTE_002".to_string(),
49 message: "remote query channel closed".to_string(),
50 ..Default::default()
51 }))
52 })?
53 }
54
55 fn connect(&self, address: &str, token: Option<&str>) -> Result<GrpcClient, Error> {
56 let address_owned = address.to_string();
57 let (tx, rx) = mpsc::sync_channel(1);
58
59 self.runtime.spawn(async move {
60 let result = GrpcClient::connect(&address_owned).await;
61 let _ = tx.send(result);
62 });
63
64 let mut client = rx.recv().map_err(|_| {
65 Error(Box::new(Diagnostic {
66 code: "REMOTE_002".to_string(),
67 message: "remote connect channel closed".to_string(),
68 ..Default::default()
69 }))
70 })??;
71 if let Some(token) = token {
72 client.authenticate(token);
73 }
74 Ok(client)
75 }
76}
77
78pub fn is_remote_query(err: &Error) -> bool {
80 err.0.code == "REMOTE_001"
81}
82
83pub fn extract_remote_address(err: &Error) -> Option<String> {
85 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
86}
87
88pub fn extract_remote_token(err: &Error) -> Option<String> {
90 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote token: ")).map(|s| s.to_string())
91}
92
93#[cfg(test)]
94mod tests {
95 use reifydb_type::{error::Diagnostic, fragment::Fragment};
96
97 use super::*;
98
99 fn make_remote_error(address: &str) -> Error {
100 Error(Box::new(Diagnostic {
101 code: "REMOTE_001".to_string(),
102 message: format!(
103 "Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
104 address
105 ),
106 notes: vec![
107 "Namespace 'remote_ns' is configured as a remote namespace".to_string(),
108 format!("Remote gRPC address: {}", address),
109 ],
110 fragment: Fragment::None,
111 ..Default::default()
112 }))
113 }
114
115 #[test]
116 fn test_is_remote_query_true() {
117 let err = make_remote_error("http://localhost:50051");
118 assert!(is_remote_query(&err));
119 }
120
121 #[test]
122 fn test_is_remote_query_false() {
123 let err = Error(Box::new(Diagnostic {
124 code: "CATALOG_001".to_string(),
125 message: "Table not found".to_string(),
126 fragment: Fragment::None,
127 ..Default::default()
128 }));
129 assert!(!is_remote_query(&err));
130 }
131
132 #[test]
133 fn test_extract_remote_address() {
134 let err = make_remote_error("http://localhost:50051");
135 assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
136 }
137
138 #[test]
139 fn test_extract_remote_address_missing() {
140 let err = Error(Box::new(Diagnostic {
141 code: "REMOTE_001".to_string(),
142 message: "Some error".to_string(),
143 notes: vec![],
144 fragment: Fragment::None,
145 ..Default::default()
146 }));
147 assert_eq!(extract_remote_address(&err), None);
148 }
149
150 #[test]
151 fn test_extract_remote_token() {
152 let err = Error(Box::new(Diagnostic {
153 code: "REMOTE_001".to_string(),
154 message: "Remote namespace".to_string(),
155 notes: vec![
156 "Namespace 'test' is configured as a remote namespace".to_string(),
157 "Remote gRPC address: http://localhost:50051".to_string(),
158 "Remote token: my-secret".to_string(),
159 ],
160 fragment: Fragment::None,
161 ..Default::default()
162 }));
163 assert_eq!(extract_remote_token(&err), Some("my-secret".to_string()));
164 }
165
166 #[test]
167 fn test_extract_remote_token_missing() {
168 let err = make_remote_error("http://localhost:50051");
169 assert_eq!(extract_remote_token(&err), None);
170 }
171}