Skip to main content

reifydb_sub_server/
subscribe.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt;
5
6use reifydb_catalog::{drop_flow_by_name, drop_subscription};
7use reifydb_core::{
8	error::diagnostic::internal::internal,
9	interface::catalog::{
10		id::SubscriptionId,
11		subscription::{subscription_flow_name, subscription_flow_namespace},
12	},
13};
14use reifydb_engine::engine::StandardEngine;
15use reifydb_type::{
16	Result as TypeResult,
17	error::Error,
18	params::Params,
19	value::{Value, identity::IdentityId},
20};
21use tracing::{debug, error};
22
23use crate::{
24	execute::{ExecuteError, execute_subscription},
25	state::AppState,
26};
27
28/// Error type for subscription creation.
29pub enum CreateSubscriptionError {
30	Execute(ExecuteError),
31	ExtractionFailed,
32}
33
34impl fmt::Display for CreateSubscriptionError {
35	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36		match self {
37			CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
38			CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
39		}
40	}
41}
42
43impl fmt::Debug for CreateSubscriptionError {
44	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45		match self {
46			CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
47			CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
48		}
49	}
50}
51
52impl From<ExecuteError> for CreateSubscriptionError {
53	fn from(err: ExecuteError) -> Self {
54		CreateSubscriptionError::Execute(err)
55	}
56}
57
58/// Result of creating a subscription: either local or remote.
59pub enum CreateSubscriptionResult {
60	Local(SubscriptionId),
61	Remote {
62		address: String,
63		query: String,
64	},
65}
66
67/// Execute `CREATE SUBSCRIPTION AS { query }` and extract the subscription ID from the result.
68pub async fn create_subscription(
69	state: &AppState,
70	identity: IdentityId,
71	query: &str,
72) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
73	let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
74	debug!("Subscription statement: {}", statement);
75
76	let frames = execute_subscription(
77		state.actor_system(),
78		state.engine_clone(),
79		statement,
80		identity,
81		Params::None,
82		state.query_timeout(),
83	)
84	.await?;
85
86	let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
87
88	// Check if result indicates a remote source
89	if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
90		let address = if addr_col.data.len() > 0 {
91			match addr_col.data.get_value(0) {
92				Value::Utf8(s) => s,
93				_ => return Err(CreateSubscriptionError::ExtractionFailed),
94			}
95		} else {
96			return Err(CreateSubscriptionError::ExtractionFailed);
97		};
98
99		let rql = frame
100			.columns
101			.iter()
102			.find(|c| c.name == "remote_rql")
103			.and_then(|col| {
104				if col.data.len() > 0 {
105					match col.data.get_value(0) {
106						Value::Utf8(s) => Some(s),
107						_ => None,
108					}
109				} else {
110					None
111				}
112			})
113			.ok_or(CreateSubscriptionError::ExtractionFailed)?;
114
115		return Ok(CreateSubscriptionResult::Remote {
116			address,
117			query: rql,
118		});
119	}
120
121	// Normal local path: extract subscription_id
122	frame.columns
123		.iter()
124		.find(|c| c.name == "subscription_id")
125		.and_then(|col| {
126			if col.data.len() > 0 {
127				Some(col.data.get_value(0))
128			} else {
129				None
130			}
131		})
132		.and_then(|value| match value {
133			Value::Uint8(id) => Some(SubscriptionId(id)),
134			other => {
135				error!("subscription_id column has wrong type: {:?}", other);
136				None
137			}
138		})
139		.map(CreateSubscriptionResult::Local)
140		.ok_or(CreateSubscriptionError::ExtractionFailed)
141}
142
143/// Synchronous cleanup: begin subscription txn, drop flow, drop subscription, commit.
144pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
145	let mut txn = engine.begin_subscription()?;
146	let flow_name = subscription_flow_name(subscription_id);
147	let namespace_id = subscription_flow_namespace();
148	drop_flow_by_name(txn.as_admin_mut(), namespace_id, &flow_name)?;
149	drop_subscription(txn.as_admin_mut(), subscription_id)?;
150	txn.commit()?;
151	Ok(())
152}
153
154/// Async cleanup via the compute pool.
155pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
156	let engine = state.engine_clone();
157	let system = state.actor_system();
158
159	system.compute(move || cleanup_subscription_sync(&engine, subscription_id))
160		.await
161		.map_err(|e| Error(internal(format!("Compute pool error: {:?}", e))))?
162}