1use std::{
5 collections::HashMap,
6 sync::{RwLock, mpsc},
7};
8
9use reifydb_client::GrpcClient;
10use reifydb_runtime::SharedRuntime;
11use reifydb_type::{
12 error::{Diagnostic, Error},
13 params::Params,
14 value::frame::frame::Frame,
15};
16
17pub struct RemoteRegistry {
18 connections: RwLock<HashMap<String, GrpcClient>>,
19 runtime: SharedRuntime,
20}
21
22impl RemoteRegistry {
23 pub fn new(runtime: SharedRuntime) -> Self {
24 Self {
25 connections: RwLock::new(HashMap::new()),
26 runtime,
27 }
28 }
29
30 pub fn forward_query(&self, address: &str, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
31 let client = self.get_or_connect(address)?;
32 let params_opt = match ¶ms {
33 Params::None => None,
34 _ => Some(params),
35 };
36
37 let rql = rql.to_string();
38 let (tx, rx) = mpsc::sync_channel(1);
39
40 self.runtime.spawn(async move {
41 let result = client.query(&rql, params_opt).await.map(|r| r.frames);
42 let _ = tx.send(result);
43 });
44
45 rx.recv()
46 .map_err(|_| {
47 Error(Diagnostic {
48 code: "REMOTE_002".to_string(),
49 message: "remote query channel closed".to_string(),
50 ..Default::default()
51 })
52 })?
53 .map_err(Into::into)
54 }
55
56 fn get_or_connect(&self, address: &str) -> Result<GrpcClient, Error> {
57 {
59 let cache = self.connections.read().unwrap();
60 if let Some(client) = cache.get(address) {
61 return Ok(client.clone());
62 }
63 }
64
65 let address_owned = address.to_string();
67 let (tx, rx) = mpsc::sync_channel(1);
68
69 self.runtime.spawn(async move {
70 let result = GrpcClient::connect(&address_owned).await;
71 let _ = tx.send(result);
72 });
73
74 let mut client = rx.recv().map_err(|_| {
75 Error(Diagnostic {
76 code: "REMOTE_002".to_string(),
77 message: "remote connect channel closed".to_string(),
78 ..Default::default()
79 })
80 })??;
81 client.authenticate("service-token");
82 {
83 let mut cache = self.connections.write().unwrap();
84 cache.entry(address.to_string()).or_insert(client.clone());
85 }
86 Ok(client)
87 }
88}
89
90pub fn is_remote_query(err: &Error) -> bool {
92 err.0.code == "REMOTE_001"
93}
94
95pub fn extract_remote_address(err: &Error) -> Option<String> {
97 err.0.notes.iter().find_map(|n| n.strip_prefix("Remote gRPC address: ")).map(|s| s.to_string())
98}
99
100#[cfg(test)]
101mod tests {
102 use reifydb_type::{error::Diagnostic, fragment::Fragment};
103
104 use super::*;
105
106 fn make_remote_error(address: &str) -> Error {
107 Error(Diagnostic {
108 code: "REMOTE_001".to_string(),
109 message: format!(
110 "Remote namespace 'remote_ns': source 'users' is on remote instance at {}",
111 address
112 ),
113 notes: vec![
114 "Namespace 'remote_ns' is configured as a remote namespace".to_string(),
115 format!("Remote gRPC address: {}", address),
116 ],
117 fragment: Fragment::None,
118 ..Default::default()
119 })
120 }
121
122 #[test]
123 fn test_is_remote_query_true() {
124 let err = make_remote_error("http://localhost:50051");
125 assert!(is_remote_query(&err));
126 }
127
128 #[test]
129 fn test_is_remote_query_false() {
130 let err = Error(Diagnostic {
131 code: "CATALOG_001".to_string(),
132 message: "Table not found".to_string(),
133 fragment: Fragment::None,
134 ..Default::default()
135 });
136 assert!(!is_remote_query(&err));
137 }
138
139 #[test]
140 fn test_extract_remote_address() {
141 let err = make_remote_error("http://localhost:50051");
142 assert_eq!(extract_remote_address(&err), Some("http://localhost:50051".to_string()));
143 }
144
145 #[test]
146 fn test_extract_remote_address_missing() {
147 let err = Error(Diagnostic {
148 code: "REMOTE_001".to_string(),
149 message: "Some error".to_string(),
150 notes: vec![],
151 fragment: Fragment::None,
152 ..Default::default()
153 });
154 assert_eq!(extract_remote_address(&err), None);
155 }
156}