Skip to main content

reifydb_sub_server/
subscribe.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::fmt;
5
6use reifydb_catalog::{drop_flow_by_name, drop_subscription};
7use reifydb_core::{
8	error::diagnostic::internal::internal,
9	interface::catalog::{
10		id::SubscriptionId,
11		subscription::{subscription_flow_name, subscription_flow_namespace},
12	},
13};
14use reifydb_engine::engine::StandardEngine;
15use reifydb_type::{
16	Result as TypeResult,
17	error::Error,
18	params::Params,
19	value::{Value, identity::IdentityId},
20};
21use tracing::{debug, error};
22
23use crate::{
24	execute::{ExecuteError, execute_admin},
25	state::AppState,
26};
27
28/// Error type for subscription creation.
29pub enum CreateSubscriptionError {
30	Execute(ExecuteError),
31	ExtractionFailed,
32}
33
34impl fmt::Display for CreateSubscriptionError {
35	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36		match self {
37			CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
38			CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
39		}
40	}
41}
42
43impl fmt::Debug for CreateSubscriptionError {
44	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45		match self {
46			CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
47			CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
48		}
49	}
50}
51
52impl From<ExecuteError> for CreateSubscriptionError {
53	fn from(err: ExecuteError) -> Self {
54		CreateSubscriptionError::Execute(err)
55	}
56}
57
58/// Execute `CREATE SUBSCRIPTION AS { query }` and extract the subscription ID from the result.
59pub async fn create_subscription(
60	state: &AppState,
61	identity: IdentityId,
62	query: &str,
63) -> Result<SubscriptionId, CreateSubscriptionError> {
64	let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
65	debug!("Subscription statement: {}", statement);
66
67	let frames = execute_admin(
68		state.actor_system(),
69		state.engine_clone(),
70		vec![statement],
71		identity,
72		Params::None,
73		state.query_timeout(),
74	)
75	.await?;
76
77	frames.first()
78		.and_then(|frame| frame.columns.iter().find(|c| c.name == "subscription_id"))
79		.and_then(|col| {
80			if col.data.len() > 0 {
81				Some(col.data.get_value(0))
82			} else {
83				None
84			}
85		})
86		.and_then(|value| match value {
87			Value::Uint8(id) => Some(SubscriptionId(id)),
88			other => {
89				error!("subscription_id column has wrong type: {:?}", other);
90				None
91			}
92		})
93		.ok_or(CreateSubscriptionError::ExtractionFailed)
94}
95
96/// Synchronous cleanup: begin admin txn, drop flow, drop subscription, commit.
97pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
98	let mut txn = engine.begin_admin()?;
99	let flow_name = subscription_flow_name(subscription_id);
100	let namespace_id = subscription_flow_namespace();
101	drop_flow_by_name(&mut txn, namespace_id, &flow_name)?;
102	drop_subscription(&mut txn, subscription_id)?;
103	txn.commit()?;
104	Ok(())
105}
106
107/// Async cleanup via the compute pool.
108pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
109	let engine = state.engine_clone();
110	let system = state.actor_system();
111
112	system.compute(move || cleanup_subscription_sync(&engine, subscription_id))
113		.await
114		.map_err(|e| Error(internal(format!("Compute pool error: {:?}", e))))?
115}