Skip to main content

reifydb_sub_server/
subscribe.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt;
5
6use reifydb_core::{error::diagnostic::internal::internal, interface::catalog::id::SubscriptionId};
7use reifydb_engine::engine::StandardEngine;
8use reifydb_type::{
9	Result as TypeResult,
10	error::Error,
11	params::Params,
12	value::{Value, identity::IdentityId},
13};
14use tokio::task::spawn_blocking;
15use tracing::{debug, error};
16
17use crate::{
18	execute::{ExecuteError, execute},
19	interceptor::{Operation, RequestContext, RequestMetadata},
20	state::AppState,
21};
22
23/// Error type for subscription creation.
24pub enum CreateSubscriptionError {
25	Execute(ExecuteError),
26	ExtractionFailed,
27}
28
29impl fmt::Display for CreateSubscriptionError {
30	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31		match self {
32			CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
33			CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
34		}
35	}
36}
37
38impl fmt::Debug for CreateSubscriptionError {
39	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40		match self {
41			CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
42			CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
43		}
44	}
45}
46
47impl From<ExecuteError> for CreateSubscriptionError {
48	fn from(err: ExecuteError) -> Self {
49		CreateSubscriptionError::Execute(err)
50	}
51}
52
53/// Result of creating a subscription: either local or remote.
54pub enum CreateSubscriptionResult {
55	Local(SubscriptionId),
56	Remote {
57		address: String,
58		query: String,
59	},
60}
61
62/// Execute `CREATE SUBSCRIPTION AS { query }` and extract the subscription ID from the result.
63pub async fn create_subscription(
64	state: &AppState,
65	identity: IdentityId,
66	query: &str,
67	metadata: RequestMetadata,
68) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
69	let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
70	debug!("Subscription statement: {}", statement);
71
72	let ctx = RequestContext {
73		identity,
74		operation: Operation::Subscribe,
75		statements: vec![statement],
76		params: Params::None,
77		metadata,
78	};
79
80	let (frames, _duration) =
81		execute(state.request_interceptors(), state.engine_clone(), ctx, state.query_timeout(), state.clock())
82			.await?;
83
84	let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
85
86	// Check if result indicates a remote source
87	if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
88		let address = if !addr_col.data.is_empty() {
89			match addr_col.data.get_value(0) {
90				Value::Utf8(s) => s,
91				_ => return Err(CreateSubscriptionError::ExtractionFailed),
92			}
93		} else {
94			return Err(CreateSubscriptionError::ExtractionFailed);
95		};
96
97		let rql = frame
98			.columns
99			.iter()
100			.find(|c| c.name == "remote_rql")
101			.and_then(|col| {
102				if !col.data.is_empty() {
103					match col.data.get_value(0) {
104						Value::Utf8(s) => Some(s),
105						_ => None,
106					}
107				} else {
108					None
109				}
110			})
111			.ok_or(CreateSubscriptionError::ExtractionFailed)?;
112
113		return Ok(CreateSubscriptionResult::Remote {
114			address,
115			query: rql,
116		});
117	}
118
119	// Normal local path: extract subscription_id
120	frame.columns
121		.iter()
122		.find(|c| c.name == "subscription_id")
123		.and_then(|col| {
124			if !col.data.is_empty() {
125				Some(col.data.get_value(0))
126			} else {
127				None
128			}
129		})
130		.and_then(|value| match value {
131			Value::Uint8(id) => Some(SubscriptionId(id)),
132			other => {
133				error!("subscription_id column has wrong type: {:?}", other);
134				None
135			}
136		})
137		.map(CreateSubscriptionResult::Local)
138		.ok_or(CreateSubscriptionError::ExtractionFailed)
139}
140
141/// Synchronous cleanup: drop subscription via DDL.
142pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
143	let rql = format!("drop subscription if exists subscription_{};", subscription_id.0);
144	engine.admin_as(IdentityId::system(), &rql, Params::None)?;
145	Ok(())
146}
147
148/// Async cleanup via a blocking task.
149pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
150	let engine = state.engine_clone();
151
152	spawn_blocking(move || cleanup_subscription_sync(&engine, subscription_id))
153		.await
154		.map_err(|e| Error(Box::new(internal(format!("Blocking task error: {:?}", e)))))?
155}