Skip to main content

reifydb_remote_proxy/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Bridge that lets a local engine subscribe to a remote ReifyDB instance and forward incoming change payloads into
5//! a local channel or sink. Wraps the gRPC client, handles authentication, and proxies raw RBCF payloads through a
6//! conversion callback so the caller controls how remote events are typed.
7//!
8//! This is the only place in the workspace where external wire-format payloads are turned into events the local
9//! engine consumes; doing the conversion anywhere else would couple unrelated subsystems to the gRPC client and
10//! `wire-format` decoders.
11
12#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
13#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
14#![cfg_attr(not(debug_assertions), deny(warnings))]
15
16use std::fmt;
17
18use reifydb_client::{GrpcClient, GrpcSubscription, RawChangePayload, SubscriptionConfig, WireFormat};
19use tokio::{
20	select,
21	sync::{mpsc, watch},
22};
23
24#[derive(Debug)]
25pub enum RemoteSubscriptionError {
26	Connect(String),
27	Subscribe(String),
28}
29
30impl fmt::Display for RemoteSubscriptionError {
31	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32		match self {
33			Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
34			Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
35		}
36	}
37}
38
39pub struct RemoteSubscription {
40	inner: GrpcSubscription,
41	subscription_id: String,
42}
43
44impl RemoteSubscription {
45	pub fn subscription_id(&self) -> &str {
46		&self.subscription_id
47	}
48}
49
50pub async fn connect_remote(
51	address: &str,
52	body: &str,
53	config: SubscriptionConfig,
54	token: Option<&str>,
55	wire_format: WireFormat,
56) -> Result<RemoteSubscription, RemoteSubscriptionError> {
57	let mut client = GrpcClient::connect(address, wire_format)
58		.await
59		.map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
60	if let Some(t) = token {
61		client.authenticate(t);
62	}
63	let sub =
64		client.subscribe(body, config).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
65	let subscription_id = sub.subscription_id().to_string();
66	Ok(RemoteSubscription {
67		inner: sub,
68		subscription_id,
69	})
70}
71
72pub async fn proxy_remote<T, F>(
73	mut remote_sub: RemoteSubscription,
74	sender: mpsc::UnboundedSender<T>,
75	mut shutdown: watch::Receiver<bool>,
76	convert: F,
77) where
78	T: Send + 'static,
79	F: Fn(RawChangePayload) -> T,
80{
81	loop {
82		select! {
83			payload = remote_sub.inner.recv_raw() => {
84				match payload {
85					Some(payload) => {
86						if sender.send(convert(payload)).is_err() {
87							break;
88						}
89					}
90					None => break,
91				}
92			}
93			_ = sender.closed() => break,
94			_ = shutdown.changed() => break,
95		}
96	}
97}
98
99pub async fn proxy_remote_to_sink<F>(
100	mut remote_sub: RemoteSubscription,
101	mut shutdown: watch::Receiver<bool>,
102	mut sink: F,
103) where
104	F: FnMut(RawChangePayload) -> bool + Send + 'static,
105{
106	loop {
107		select! {
108			payload = remote_sub.inner.recv_raw() => {
109				match payload {
110					Some(payload) => {
111						if !sink(payload) {
112							break;
113						}
114					}
115					None => break,
116				}
117			}
118			_ = shutdown.changed() => break,
119		}
120	}
121}