Skip to main content

reifydb_remote_proxy/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![cfg_attr(not(debug_assertions), deny(warnings))]
6
7use std::fmt;
8
9use reifydb_client::{GrpcClient, GrpcSubscription, RawChangePayload, WireFormat};
10use tokio::{
11	select,
12	sync::{mpsc, watch},
13};
14
15/// Error returned when connecting to a remote subscription fails.
16#[derive(Debug)]
17pub enum RemoteSubscriptionError {
18	Connect(String),
19	Subscribe(String),
20}
21
22impl fmt::Display for RemoteSubscriptionError {
23	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24		match self {
25			Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
26			Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
27		}
28	}
29}
30
31/// An active remote subscription, wrapping the underlying gRPC connection.
32pub struct RemoteSubscription {
33	inner: GrpcSubscription,
34	subscription_id: String,
35}
36
37impl RemoteSubscription {
38	/// The subscription ID assigned by the remote node.
39	pub fn subscription_id(&self) -> &str {
40		&self.subscription_id
41	}
42}
43
44/// Connect to a remote node and create a subscription.
45pub async fn connect_remote(
46	address: &str,
47	rql: &str,
48	token: Option<&str>,
49	wire_format: WireFormat,
50) -> Result<RemoteSubscription, RemoteSubscriptionError> {
51	let mut client = GrpcClient::connect(address, wire_format)
52		.await
53		.map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
54	if let Some(t) = token {
55		client.authenticate(t);
56	}
57	let sub = client.subscribe(rql).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
58	let subscription_id = sub.subscription_id().to_string();
59	Ok(RemoteSubscription {
60		inner: sub,
61		subscription_id,
62	})
63}
64
65/// Proxy raw payloads from a remote subscription to a local channel.
66///
67/// Receives raw payloads from the remote subscription and converts them using the
68/// provided closure before sending through the local channel. Exits when:
69/// - The remote stream ends
70/// - The local channel closes (receiver dropped)
71/// - A shutdown signal is received
72pub async fn proxy_remote<T, F>(
73	mut remote_sub: RemoteSubscription,
74	sender: mpsc::UnboundedSender<T>,
75	mut shutdown: watch::Receiver<bool>,
76	convert: F,
77) where
78	T: Send + 'static,
79	F: Fn(RawChangePayload) -> T,
80{
81	loop {
82		select! {
83			payload = remote_sub.inner.recv_raw() => {
84				match payload {
85					Some(payload) => {
86						if sender.send(convert(payload)).is_err() {
87							break;
88						}
89					}
90					None => break,
91				}
92			}
93			_ = sender.closed() => break,
94			_ = shutdown.changed() => break,
95		}
96	}
97}
98
99/// Proxy raw payloads from a remote subscription into a caller-supplied sink closure.
100///
101/// Each received `RawChangePayload` is passed to `sink`. The sink returns `true` to
102/// continue, `false` to stop the proxy (e.g. downstream batch was torn down).
103/// Exits when:
104/// - The remote stream ends
105/// - `sink` returns `false`
106/// - A shutdown signal is received
107pub async fn proxy_remote_to_sink<F>(
108	mut remote_sub: RemoteSubscription,
109	mut shutdown: watch::Receiver<bool>,
110	mut sink: F,
111) where
112	F: FnMut(RawChangePayload) -> bool + Send + 'static,
113{
114	loop {
115		select! {
116			payload = remote_sub.inner.recv_raw() => {
117				match payload {
118					Some(payload) => {
119						if !sink(payload) {
120							break;
121						}
122					}
123					None => break,
124				}
125			}
126			_ = shutdown.changed() => break,
127		}
128	}
129}