Skip to main content

reifydb_sub_server/
remote.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Shared remote subscription support.
5//!
6//! Provides connection and proxy logic for remote subscriptions,
7//! used by both gRPC and WebSocket server subsystems.
8
9use std::fmt;
10
11use reifydb_client::{GrpcClient, GrpcSubscription};
12use reifydb_type::value::frame::frame::Frame;
13use tokio::{
14	select,
15	sync::{mpsc, watch},
16};
17
18/// Error returned when connecting to a remote subscription fails.
19#[derive(Debug)]
20pub enum RemoteSubscriptionError {
21	Connect(String),
22	Subscribe(String),
23}
24
25impl fmt::Display for RemoteSubscriptionError {
26	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27		match self {
28			Self::Connect(e) => write!(f, "Failed to connect to remote: {}", e),
29			Self::Subscribe(e) => write!(f, "Remote subscribe failed: {}", e),
30		}
31	}
32}
33
34/// An active remote subscription, wrapping the underlying gRPC connection.
35pub struct RemoteSubscription {
36	inner: GrpcSubscription,
37	subscription_id: String,
38}
39
40impl RemoteSubscription {
41	/// The subscription ID assigned by the remote node.
42	pub fn subscription_id(&self) -> &str {
43		&self.subscription_id
44	}
45}
46
47/// Connect to a remote node and create a subscription.
48pub async fn connect_remote(
49	address: &str,
50	query: &str,
51	token: Option<&str>,
52) -> Result<RemoteSubscription, RemoteSubscriptionError> {
53	let mut client =
54		GrpcClient::connect(address).await.map_err(|e| RemoteSubscriptionError::Connect(e.to_string()))?;
55	if let Some(t) = token {
56		client.authenticate(t);
57	}
58	let sub = client.subscribe(query).await.map_err(|e| RemoteSubscriptionError::Subscribe(e.to_string()))?;
59	let subscription_id = sub.subscription_id().to_string();
60	Ok(RemoteSubscription {
61		inner: sub,
62		subscription_id,
63	})
64}
65
66/// Proxy frames from a remote subscription to a local channel.
67///
68/// Receives frames from the remote subscription and converts them using the
69/// provided closure before sending through the local channel. Exits when:
70/// - The remote stream ends
71/// - The local channel closes (receiver dropped)
72/// - A shutdown signal is received
73pub async fn proxy_remote<T, F>(
74	mut remote_sub: RemoteSubscription,
75	sender: mpsc::Sender<T>,
76	mut shutdown: watch::Receiver<bool>,
77	convert: F,
78) where
79	T: Send + 'static,
80	F: Fn(Vec<Frame>) -> T,
81{
82	loop {
83		select! {
84			frames = remote_sub.inner.recv() => {
85				match frames {
86					Some(frames) => {
87						if sender.send(convert(frames)).await.is_err() {
88							break;
89						}
90					}
91					None => break,
92				}
93			}
94			_ = sender.closed() => break,
95			_ = shutdown.changed() => break,
96		}
97	}
98}