reifydb_remote_proxy/
lib.rs1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![cfg_attr(not(debug_assertions), deny(warnings))]
6
7use std::fmt;
8
9use reifydb_client::{GrpcClient, GrpcSubscription, RawChangePayload, WireFormat};
10use tokio::{
11 select,
12 sync::{mpsc, watch},
13};
14
15#[derive(Debug)]
17pub enum RemoteSubscriptionError {
18 Connect(String),
19 Subscribe(String),
20}
21
22impl fmt::Display for RemoteSubscriptionError {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 match self {
25 Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
26 Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
27 }
28 }
29}
30
31pub struct RemoteSubscription {
33 inner: GrpcSubscription,
34 subscription_id: String,
35}
36
37impl RemoteSubscription {
38 pub fn subscription_id(&self) -> &str {
40 &self.subscription_id
41 }
42}
43
44pub async fn connect_remote(
46 address: &str,
47 rql: &str,
48 token: Option<&str>,
49 wire_format: WireFormat,
50) -> Result<RemoteSubscription, RemoteSubscriptionError> {
51 let mut client = GrpcClient::connect(address, wire_format)
52 .await
53 .map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
54 if let Some(t) = token {
55 client.authenticate(t);
56 }
57 let sub = client.subscribe(rql).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
58 let subscription_id = sub.subscription_id().to_string();
59 Ok(RemoteSubscription {
60 inner: sub,
61 subscription_id,
62 })
63}
64
65pub async fn proxy_remote<T, F>(
73 mut remote_sub: RemoteSubscription,
74 sender: mpsc::UnboundedSender<T>,
75 mut shutdown: watch::Receiver<bool>,
76 convert: F,
77) where
78 T: Send + 'static,
79 F: Fn(RawChangePayload) -> T,
80{
81 loop {
82 select! {
83 payload = remote_sub.inner.recv_raw() => {
84 match payload {
85 Some(payload) => {
86 if sender.send(convert(payload)).is_err() {
87 break;
88 }
89 }
90 None => break,
91 }
92 }
93 _ = sender.closed() => break,
94 _ = shutdown.changed() => break,
95 }
96 }
97}
98
99pub async fn proxy_remote_to_sink<F>(
108 mut remote_sub: RemoteSubscription,
109 mut shutdown: watch::Receiver<bool>,
110 mut sink: F,
111) where
112 F: FnMut(RawChangePayload) -> bool + Send + 'static,
113{
114 loop {
115 select! {
116 payload = remote_sub.inner.recv_raw() => {
117 match payload {
118 Some(payload) => {
119 if !sink(payload) {
120 break;
121 }
122 }
123 None => break,
124 }
125 }
126 _ = shutdown.changed() => break,
127 }
128 }
129}