Skip to main content

reifydb_sub_server/
subscribe.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt;
5
6#[cfg(not(reifydb_single_threaded))]
7use reifydb_core::error::diagnostic::internal::internal;
8use reifydb_core::interface::catalog::id::SubscriptionId;
9use reifydb_engine::engine::StandardEngine;
10#[cfg(not(reifydb_single_threaded))]
11use reifydb_type::error::Error;
12use reifydb_type::{
13	Result as TypeResult,
14	params::Params,
15	value::{Value, frame::frame::Frame, identity::IdentityId},
16};
17#[cfg(not(reifydb_single_threaded))]
18use tracing::debug;
19#[allow(unused_imports)]
20use tracing::error;
21
22/// Error type for subscription creation.
23pub enum CreateSubscriptionError {
24	Execute(ExecuteError),
25	ExtractionFailed,
26}
27
28impl fmt::Display for CreateSubscriptionError {
29	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30		match self {
31			CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
32			CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
33		}
34	}
35}
36
37impl fmt::Debug for CreateSubscriptionError {
38	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39		match self {
40			CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
41			CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
42		}
43	}
44}
45
46impl From<ExecuteError> for CreateSubscriptionError {
47	fn from(err: ExecuteError) -> Self {
48		CreateSubscriptionError::Execute(err)
49	}
50}
51
52/// Result of creating a subscription: either local or remote.
53pub enum CreateSubscriptionResult {
54	Local(SubscriptionId),
55	Remote {
56		address: String,
57		rql: String,
58		token: Option<String>,
59	},
60}
61
62#[cfg(not(reifydb_single_threaded))]
63use reifydb_core::actors::server::Operation;
64#[cfg(not(reifydb_single_threaded))]
65use tokio::task::spawn_blocking;
66
67use crate::execute::ExecuteError;
68#[cfg(not(reifydb_single_threaded))]
69use crate::{
70	dispatch::dispatch_subscribe,
71	interceptor::{RequestContext, RequestMetadata},
72	state::AppState,
73};
74
75/// Execute `CREATE SUBSCRIPTION AS { query }` and extract the subscription ID from the result.
76#[cfg(not(reifydb_single_threaded))]
77pub async fn create_subscription(
78	state: &AppState,
79	identity: IdentityId,
80	rql: &str,
81	metadata: RequestMetadata,
82) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
83	let subscription_rql = format!("CREATE SUBSCRIPTION AS {{ {} }}", rql);
84	debug!("Subscription rql: {}", subscription_rql);
85
86	let ctx = RequestContext {
87		identity,
88		operation: Operation::Subscribe,
89		rql: subscription_rql,
90		params: Params::None,
91		metadata,
92	};
93
94	let (frames, _metrics) = dispatch_subscribe(state, ctx).await?;
95
96	let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
97
98	// Check if result indicates a remote source
99	if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
100		let address = if !addr_col.data.is_empty() {
101			match addr_col.data.get_value(0) {
102				Value::Utf8(s) => s,
103				_ => return Err(CreateSubscriptionError::ExtractionFailed),
104			}
105		} else {
106			return Err(CreateSubscriptionError::ExtractionFailed);
107		};
108
109		let rql = frame
110			.columns
111			.iter()
112			.find(|c| c.name == "remote_rql")
113			.and_then(|col| {
114				if !col.data.is_empty() {
115					match col.data.get_value(0) {
116						Value::Utf8(s) => Some(s),
117						_ => None,
118					}
119				} else {
120					None
121				}
122			})
123			.ok_or(CreateSubscriptionError::ExtractionFailed)?;
124
125		let token = frame.columns.iter().find(|c| c.name == "remote_token").and_then(|col| {
126			if !col.data.is_empty() {
127				match col.data.get_value(0) {
128					Value::Utf8(s) => Some(s),
129					_ => None,
130				}
131			} else {
132				None
133			}
134		});
135
136		return Ok(CreateSubscriptionResult::Remote {
137			address,
138			rql,
139			token,
140		});
141	}
142
143	// Normal local path: extract subscription_id
144	frame.columns
145		.iter()
146		.find(|c| c.name == "subscription_id")
147		.and_then(|col| {
148			if !col.data.is_empty() {
149				Some(col.data.get_value(0))
150			} else {
151				None
152			}
153		})
154		.and_then(|value| match value {
155			Value::Uint8(id) => Some(SubscriptionId(id)),
156			other => {
157				error!("subscription_id column has wrong type: {:?}", other);
158				None
159			}
160		})
161		.map(CreateSubscriptionResult::Local)
162		.ok_or(CreateSubscriptionError::ExtractionFailed)
163}
164
165/// Extract the subscription ID from frames returned by `engine.subscribe_as`.
166///
167/// The engine returns a single-row frame with a `subscription_id` column
168/// containing a `Value::Uint8(id)`.
169pub fn extract_subscription_id(frames: &[Frame]) -> Option<SubscriptionId> {
170	let frame = frames.first()?;
171	frame.columns
172		.iter()
173		.find(|c| c.name == "subscription_id")
174		.and_then(|col| {
175			if !col.data.is_empty() {
176				Some(col.data.get_value(0))
177			} else {
178				None
179			}
180		})
181		.and_then(|value| match value {
182			Value::Uint8(id) => Some(SubscriptionId(id)),
183			_ => None,
184		})
185}
186
187/// Synchronous cleanup: drop subscription via DDL.
188pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
189	let rql = format!("drop subscription if exists subscription_{};", subscription_id.0);
190	engine.admin_as(IdentityId::system(), &rql, Params::None).check()?;
191	Ok(())
192}
193
194/// Async cleanup via a blocking task.
195#[cfg(not(reifydb_single_threaded))]
196pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
197	let engine = state.engine_clone();
198
199	spawn_blocking(move || cleanup_subscription_sync(&engine, subscription_id))
200		.await
201		.map_err(|e| Error(Box::new(internal(format!("Blocking task error: {:?}", e)))))?
202}