Skip to main content

reifydb_sub_server/
subscribe.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt;
5
6use reifydb_catalog::{drop_flow_by_name, drop_subscription};
7use reifydb_core::{
8	error::diagnostic::internal::internal,
9	interface::catalog::{
10		id::SubscriptionId,
11		subscription::{subscription_flow_name, subscription_flow_namespace},
12	},
13};
14use reifydb_engine::engine::StandardEngine;
15use reifydb_type::{
16	Result as TypeResult,
17	error::Error,
18	params::Params,
19	value::{Value, identity::IdentityId},
20};
21use tracing::{debug, error};
22
23use crate::{
24	execute::{ExecuteError, execute},
25	interceptor::{Operation, RequestContext, RequestMetadata},
26	state::AppState,
27};
28
29/// Error type for subscription creation.
30pub enum CreateSubscriptionError {
31	Execute(ExecuteError),
32	ExtractionFailed,
33}
34
35impl fmt::Display for CreateSubscriptionError {
36	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37		match self {
38			CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
39			CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
40		}
41	}
42}
43
44impl fmt::Debug for CreateSubscriptionError {
45	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46		match self {
47			CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
48			CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
49		}
50	}
51}
52
53impl From<ExecuteError> for CreateSubscriptionError {
54	fn from(err: ExecuteError) -> Self {
55		CreateSubscriptionError::Execute(err)
56	}
57}
58
59/// Result of creating a subscription: either local or remote.
60pub enum CreateSubscriptionResult {
61	Local(SubscriptionId),
62	Remote {
63		address: String,
64		query: String,
65	},
66}
67
68/// Execute `CREATE SUBSCRIPTION AS { query }` and extract the subscription ID from the result.
69pub async fn create_subscription(
70	state: &AppState,
71	identity: IdentityId,
72	query: &str,
73	metadata: RequestMetadata,
74) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
75	let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
76	debug!("Subscription statement: {}", statement);
77
78	let ctx = RequestContext {
79		identity,
80		operation: Operation::Subscribe,
81		statements: vec![statement],
82		params: Params::None,
83		metadata,
84	};
85
86	let (frames, _duration) = execute(
87		state.request_interceptors(),
88		state.actor_system(),
89		state.engine_clone(),
90		ctx,
91		state.query_timeout(),
92		state.clock(),
93	)
94	.await?;
95
96	let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
97
98	// Check if result indicates a remote source
99	if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
100		let address = if addr_col.data.len() > 0 {
101			match addr_col.data.get_value(0) {
102				Value::Utf8(s) => s,
103				_ => return Err(CreateSubscriptionError::ExtractionFailed),
104			}
105		} else {
106			return Err(CreateSubscriptionError::ExtractionFailed);
107		};
108
109		let rql = frame
110			.columns
111			.iter()
112			.find(|c| c.name == "remote_rql")
113			.and_then(|col| {
114				if col.data.len() > 0 {
115					match col.data.get_value(0) {
116						Value::Utf8(s) => Some(s),
117						_ => None,
118					}
119				} else {
120					None
121				}
122			})
123			.ok_or(CreateSubscriptionError::ExtractionFailed)?;
124
125		return Ok(CreateSubscriptionResult::Remote {
126			address,
127			query: rql,
128		});
129	}
130
131	// Normal local path: extract subscription_id
132	frame.columns
133		.iter()
134		.find(|c| c.name == "subscription_id")
135		.and_then(|col| {
136			if col.data.len() > 0 {
137				Some(col.data.get_value(0))
138			} else {
139				None
140			}
141		})
142		.and_then(|value| match value {
143			Value::Uint8(id) => Some(SubscriptionId(id)),
144			other => {
145				error!("subscription_id column has wrong type: {:?}", other);
146				None
147			}
148		})
149		.map(CreateSubscriptionResult::Local)
150		.ok_or(CreateSubscriptionError::ExtractionFailed)
151}
152
153/// Synchronous cleanup: begin subscription txn, drop flow, drop subscription, commit.
154pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
155	let mut txn = engine.begin_subscription(IdentityId::system())?;
156	let flow_name = subscription_flow_name(subscription_id);
157	let namespace_id = subscription_flow_namespace();
158	drop_flow_by_name(txn.as_admin_mut(), namespace_id, &flow_name)?;
159	drop_subscription(txn.as_admin_mut(), subscription_id)?;
160	txn.commit()?;
161	Ok(())
162}
163
164/// Async cleanup via the compute pool.
165pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
166	let engine = state.engine_clone();
167	let system = state.actor_system();
168
169	system.compute(move || cleanup_subscription_sync(&engine, subscription_id))
170		.await
171		.map_err(|e| Error(internal(format!("Compute pool error: {:?}", e))))?
172}