reifydb_remote_proxy/
lib.rs1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![cfg_attr(not(debug_assertions), deny(warnings))]
6
7use std::fmt;
13
14use reifydb_client::{GrpcClient, GrpcSubscription, RawChangePayload, WireFormat};
15use tokio::{
16 select,
17 sync::{mpsc, watch},
18};
19
20#[derive(Debug)]
22pub enum RemoteSubscriptionError {
23 Connect(String),
24 Subscribe(String),
25}
26
27impl fmt::Display for RemoteSubscriptionError {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
31 Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
32 }
33 }
34}
35
36pub struct RemoteSubscription {
38 inner: GrpcSubscription,
39 subscription_id: String,
40}
41
42impl RemoteSubscription {
43 pub fn subscription_id(&self) -> &str {
45 &self.subscription_id
46 }
47}
48
49pub async fn connect_remote(
51 address: &str,
52 rql: &str,
53 token: Option<&str>,
54 wire_format: WireFormat,
55) -> Result<RemoteSubscription, RemoteSubscriptionError> {
56 let mut client = GrpcClient::connect(address, wire_format)
57 .await
58 .map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
59 if let Some(t) = token {
60 client.authenticate(t);
61 }
62 let sub = client.subscribe(rql).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
63 let subscription_id = sub.subscription_id().to_string();
64 Ok(RemoteSubscription {
65 inner: sub,
66 subscription_id,
67 })
68}
69
70pub async fn proxy_remote<T, F>(
78 mut remote_sub: RemoteSubscription,
79 sender: mpsc::UnboundedSender<T>,
80 mut shutdown: watch::Receiver<bool>,
81 convert: F,
82) where
83 T: Send + 'static,
84 F: Fn(RawChangePayload) -> T,
85{
86 loop {
87 select! {
88 payload = remote_sub.inner.recv_raw() => {
89 match payload {
90 Some(payload) => {
91 if sender.send(convert(payload)).is_err() {
92 break;
93 }
94 }
95 None => break,
96 }
97 }
98 _ = sender.closed() => break,
99 _ = shutdown.changed() => break,
100 }
101 }
102}
103
104pub async fn proxy_remote_to_sink<F>(
113 mut remote_sub: RemoteSubscription,
114 mut shutdown: watch::Receiver<bool>,
115 mut sink: F,
116) where
117 F: FnMut(RawChangePayload) -> bool + Send + 'static,
118{
119 loop {
120 select! {
121 payload = remote_sub.inner.recv_raw() => {
122 match payload {
123 Some(payload) => {
124 if !sink(payload) {
125 break;
126 }
127 }
128 None => break,
129 }
130 }
131 _ = shutdown.changed() => break,
132 }
133 }
134}