Skip to main content

reifydb_remote_proxy/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3#![cfg_attr(not(debug_assertions), deny(clippy::disallowed_methods))]
4#![cfg_attr(debug_assertions, warn(clippy::disallowed_methods))]
5#![cfg_attr(not(debug_assertions), deny(warnings))]
6
7//! Remote subscription proxy.
8//!
9//! Provides connection and proxy logic for remote subscriptions,
10//! used by both gRPC and WebSocket server subsystems.
11
12use std::fmt;
13
14use reifydb_client::{GrpcClient, GrpcSubscription};
15use reifydb_type::value::frame::frame::Frame;
16use tokio::{
17	select,
18	sync::{mpsc, watch},
19};
20
21/// Error returned when connecting to a remote subscription fails.
22#[derive(Debug)]
23pub enum RemoteSubscriptionError {
24	Connect(String),
25	Subscribe(String),
26}
27
28impl fmt::Display for RemoteSubscriptionError {
29	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30		match self {
31			Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
32			Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
33		}
34	}
35}
36
37/// An active remote subscription, wrapping the underlying gRPC connection.
38pub struct RemoteSubscription {
39	inner: GrpcSubscription,
40	subscription_id: String,
41}
42
43impl RemoteSubscription {
44	/// The subscription ID assigned by the remote node.
45	pub fn subscription_id(&self) -> &str {
46		&self.subscription_id
47	}
48}
49
50/// Connect to a remote node and create a subscription.
51pub async fn connect_remote(
52	address: &str,
53	query: &str,
54	token: Option<&str>,
55) -> Result<RemoteSubscription, RemoteSubscriptionError> {
56	let mut client =
57		GrpcClient::connect(address).await.map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
58	if let Some(t) = token {
59		client.authenticate(t);
60	}
61	let sub = client.subscribe(query).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
62	let subscription_id = sub.subscription_id().to_string();
63	Ok(RemoteSubscription {
64		inner: sub,
65		subscription_id,
66	})
67}
68
69/// Proxy frames from a remote subscription to a local channel.
70///
71/// Receives frames from the remote subscription and converts them using the
72/// provided closure before sending through the local channel. Exits when:
73/// - The remote stream ends
74/// - The local channel closes (receiver dropped)
75/// - A shutdown signal is received
76pub async fn proxy_remote<T, F>(
77	mut remote_sub: RemoteSubscription,
78	sender: mpsc::UnboundedSender<T>,
79	mut shutdown: watch::Receiver<bool>,
80	convert: F,
81) where
82	T: Send + 'static,
83	F: Fn(Vec<Frame>) -> T,
84{
85	loop {
86		select! {
87			frames = remote_sub.inner.recv() => {
88				match frames {
89					Some(frames) => {
90						if sender.send(convert(frames)).is_err() {
91							break;
92						}
93					}
94					None => break,
95				}
96			}
97			_ = sender.closed() => break,
98			_ = shutdown.changed() => break,
99		}
100	}
101}