reifydb_remote_proxy/
lib.rs1#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
13#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
14#![cfg_attr(not(debug_assertions), deny(warnings))]
15
16use std::fmt;
17
18use reifydb_client::{GrpcClient, GrpcSubscription, RawChangePayload, SubscriptionConfig, WireFormat};
19use tokio::{
20 select,
21 sync::{mpsc, watch},
22};
23
24#[derive(Debug)]
25pub enum RemoteSubscriptionError {
26 Connect(String),
27 Subscribe(String),
28}
29
30impl fmt::Display for RemoteSubscriptionError {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
34 Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
35 }
36 }
37}
38
39pub struct RemoteSubscription {
40 inner: GrpcSubscription,
41 subscription_id: String,
42}
43
44impl RemoteSubscription {
45 pub fn subscription_id(&self) -> &str {
46 &self.subscription_id
47 }
48}
49
50pub async fn connect_remote(
51 address: &str,
52 body: &str,
53 config: SubscriptionConfig,
54 token: Option<&str>,
55 wire_format: WireFormat,
56) -> Result<RemoteSubscription, RemoteSubscriptionError> {
57 let mut client = GrpcClient::connect(address, wire_format)
58 .await
59 .map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
60 if let Some(t) = token {
61 client.authenticate(t);
62 }
63 let sub =
64 client.subscribe(body, config).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
65 let subscription_id = sub.subscription_id().to_string();
66 Ok(RemoteSubscription {
67 inner: sub,
68 subscription_id,
69 })
70}
71
72pub async fn proxy_remote<T, F>(
73 mut remote_sub: RemoteSubscription,
74 sender: mpsc::UnboundedSender<T>,
75 mut shutdown: watch::Receiver<bool>,
76 convert: F,
77) where
78 T: Send + 'static,
79 F: Fn(RawChangePayload) -> T,
80{
81 loop {
82 select! {
83 payload = remote_sub.inner.recv_raw() => {
84 match payload {
85 Some(payload) => {
86 if sender.send(convert(payload)).is_err() {
87 break;
88 }
89 }
90 None => break,
91 }
92 }
93 _ = sender.closed() => break,
94 _ = shutdown.changed() => break,
95 }
96 }
97}
98
99pub async fn proxy_remote_to_sink<F>(
100 mut remote_sub: RemoteSubscription,
101 mut shutdown: watch::Receiver<bool>,
102 mut sink: F,
103) where
104 F: FnMut(RawChangePayload) -> bool + Send + 'static,
105{
106 loop {
107 select! {
108 payload = remote_sub.inner.recv_raw() => {
109 match payload {
110 Some(payload) => {
111 if !sink(payload) {
112 break;
113 }
114 }
115 None => break,
116 }
117 }
118 _ = shutdown.changed() => break,
119 }
120 }
121}