reifydb_remote_proxy/
lib.rs1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![cfg_attr(not(debug_assertions), deny(warnings))]
6
7use std::fmt;
13
14use reifydb_client::{GrpcClient, GrpcSubscription, WireFormat};
15use reifydb_type::value::frame::frame::Frame;
16use tokio::{
17 select,
18 sync::{mpsc, watch},
19};
20
21#[derive(Debug)]
23pub enum RemoteSubscriptionError {
24 Connect(String),
25 Subscribe(String),
26}
27
28impl fmt::Display for RemoteSubscriptionError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
32 Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
33 }
34 }
35}
36
37pub struct RemoteSubscription {
39 inner: GrpcSubscription,
40 subscription_id: String,
41}
42
43impl RemoteSubscription {
44 pub fn subscription_id(&self) -> &str {
46 &self.subscription_id
47 }
48}
49
50pub async fn connect_remote(
52 address: &str,
53 rql: &str,
54 token: Option<&str>,
55) -> Result<RemoteSubscription, RemoteSubscriptionError> {
56 let mut client = GrpcClient::connect(address, WireFormat::Proto)
57 .await
58 .map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
59 if let Some(t) = token {
60 client.authenticate(t);
61 }
62 let sub = client.subscribe(rql).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
63 let subscription_id = sub.subscription_id().to_string();
64 Ok(RemoteSubscription {
65 inner: sub,
66 subscription_id,
67 })
68}
69
70pub async fn proxy_remote<T, F>(
78 mut remote_sub: RemoteSubscription,
79 sender: mpsc::UnboundedSender<T>,
80 mut shutdown: watch::Receiver<bool>,
81 convert: F,
82) where
83 T: Send + 'static,
84 F: Fn(Vec<Frame>) -> T,
85{
86 loop {
87 select! {
88 frames = remote_sub.inner.recv() => {
89 match frames {
90 Some(frames) => {
91 if sender.send(convert(frames)).is_err() {
92 break;
93 }
94 }
95 None => break,
96 }
97 }
98 _ = sender.closed() => break,
99 _ = shutdown.changed() => break,
100 }
101 }
102}