reifydb_sub_server/
remote.rs1use std::fmt;
10
11use reifydb_client::{GrpcClient, GrpcSubscription};
12use reifydb_type::value::frame::frame::Frame;
13use tokio::{
14 select,
15 sync::{mpsc, watch},
16};
17
18#[derive(Debug)]
20pub enum RemoteSubscriptionError {
21 Connect(String),
22 Subscribe(String),
23}
24
25impl fmt::Display for RemoteSubscriptionError {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
29 Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
30 }
31 }
32}
33
34pub struct RemoteSubscription {
36 inner: GrpcSubscription,
37 subscription_id: String,
38}
39
40impl RemoteSubscription {
41 pub fn subscription_id(&self) -> &str {
43 &self.subscription_id
44 }
45}
46
47pub async fn connect_remote(
49 address: &str,
50 query: &str,
51 token: Option<&str>,
52) -> Result<RemoteSubscription, RemoteSubscriptionError> {
53 let mut client =
54 GrpcClient::connect(address).await.map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
55 if let Some(t) = token {
56 client.authenticate(t);
57 }
58 let sub = client.subscribe(query).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
59 let subscription_id = sub.subscription_id().to_string();
60 Ok(RemoteSubscription {
61 inner: sub,
62 subscription_id,
63 })
64}
65
66pub async fn proxy_remote<T, F>(
74 mut remote_sub: RemoteSubscription,
75 sender: mpsc::Sender<T>,
76 mut shutdown: watch::Receiver<bool>,
77 convert: F,
78) where
79 T: Send + 'static,
80 F: Fn(Vec<Frame>) -> T,
81{
82 loop {
83 select! {
84 frames = remote_sub.inner.recv() => {
85 match frames {
86 Some(frames) => {
87 if sender.send(convert(frames)).await.is_err() {
88 break;
89 }
90 }
91 None => break,
92 }
93 }
94 _ = sender.closed() => break,
95 _ = shutdown.changed() => break,
96 }
97 }
98}