Skip to main content

reifydb_remote_proxy/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![cfg_attr(not(debug_assertions), deny(warnings))]
6
7//! Remote subscription proxy.
8//!
9//! Provides connection and proxy logic for remote subscriptions,
10//! used by both gRPC and WebSocket server subsystems.
11
12use std::fmt;
13
14use reifydb_client::{GrpcClient, GrpcSubscription, RawChangePayload, WireFormat};
15use tokio::{
16	select,
17	sync::{mpsc, watch},
18};
19
20/// Error returned when connecting to a remote subscription fails.
21#[derive(Debug)]
22pub enum RemoteSubscriptionError {
23	Connect(String),
24	Subscribe(String),
25}
26
27impl fmt::Display for RemoteSubscriptionError {
28	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29		match self {
30			Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
31			Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
32		}
33	}
34}
35
36/// An active remote subscription, wrapping the underlying gRPC connection.
37pub struct RemoteSubscription {
38	inner: GrpcSubscription,
39	subscription_id: String,
40}
41
42impl RemoteSubscription {
43	/// The subscription ID assigned by the remote node.
44	pub fn subscription_id(&self) -> &str {
45		&self.subscription_id
46	}
47}
48
49/// Connect to a remote node and create a subscription.
50pub async fn connect_remote(
51	address: &str,
52	rql: &str,
53	token: Option<&str>,
54	wire_format: WireFormat,
55) -> Result<RemoteSubscription, RemoteSubscriptionError> {
56	let mut client = GrpcClient::connect(address, wire_format)
57		.await
58		.map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
59	if let Some(t) = token {
60		client.authenticate(t);
61	}
62	let sub = client.subscribe(rql).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
63	let subscription_id = sub.subscription_id().to_string();
64	Ok(RemoteSubscription {
65		inner: sub,
66		subscription_id,
67	})
68}
69
70/// Proxy raw payloads from a remote subscription to a local channel.
71///
72/// Receives raw payloads from the remote subscription and converts them using the
73/// provided closure before sending through the local channel. Exits when:
74/// - The remote stream ends
75/// - The local channel closes (receiver dropped)
76/// - A shutdown signal is received
77pub async fn proxy_remote<T, F>(
78	mut remote_sub: RemoteSubscription,
79	sender: mpsc::UnboundedSender<T>,
80	mut shutdown: watch::Receiver<bool>,
81	convert: F,
82) where
83	T: Send + 'static,
84	F: Fn(RawChangePayload) -> T,
85{
86	loop {
87		select! {
88			payload = remote_sub.inner.recv_raw() => {
89				match payload {
90					Some(payload) => {
91						if sender.send(convert(payload)).is_err() {
92							break;
93						}
94					}
95					None => break,
96				}
97			}
98			_ = sender.closed() => break,
99			_ = shutdown.changed() => break,
100		}
101	}
102}
103
104/// Proxy raw payloads from a remote subscription into a caller-supplied sink closure.
105///
106/// Each received `RawChangePayload` is passed to `sink`. The sink returns `true` to
107/// continue, `false` to stop the proxy (e.g. downstream batch was torn down).
108/// Exits when:
109/// - The remote stream ends
110/// - `sink` returns `false`
111/// - A shutdown signal is received
112pub async fn proxy_remote_to_sink<F>(
113	mut remote_sub: RemoteSubscription,
114	mut shutdown: watch::Receiver<bool>,
115	mut sink: F,
116) where
117	F: FnMut(RawChangePayload) -> bool + Send + 'static,
118{
119	loop {
120		select! {
121			payload = remote_sub.inner.recv_raw() => {
122				match payload {
123					Some(payload) => {
124						if !sink(payload) {
125							break;
126						}
127					}
128					None => break,
129				}
130			}
131			_ = shutdown.changed() => break,
132		}
133	}
134}