reifydb_sub_server/
subscribe.rs1use std::fmt;
5
6use reifydb_catalog::{drop_flow_by_name, drop_subscription};
7use reifydb_core::{
8 error::diagnostic::internal::internal,
9 interface::catalog::{
10 id::SubscriptionId,
11 subscription::{subscription_flow_name, subscription_flow_namespace},
12 },
13};
14use reifydb_engine::engine::StandardEngine;
15use reifydb_type::{
16 Result as TypeResult,
17 error::Error,
18 params::Params,
19 value::{Value, identity::IdentityId},
20};
21use tracing::{debug, error};
22
23use crate::{
24 execute::{ExecuteError, execute_subscription},
25 state::AppState,
26};
27
28pub enum CreateSubscriptionError {
30 Execute(ExecuteError),
31 ExtractionFailed,
32}
33
34impl fmt::Display for CreateSubscriptionError {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 match self {
37 CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
38 CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
39 }
40 }
41}
42
43impl fmt::Debug for CreateSubscriptionError {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 match self {
46 CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
47 CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
48 }
49 }
50}
51
52impl From<ExecuteError> for CreateSubscriptionError {
53 fn from(err: ExecuteError) -> Self {
54 CreateSubscriptionError::Execute(err)
55 }
56}
57
58pub enum CreateSubscriptionResult {
60 Local(SubscriptionId),
61 Remote {
62 address: String,
63 query: String,
64 },
65}
66
67pub async fn create_subscription(
69 state: &AppState,
70 identity: IdentityId,
71 query: &str,
72) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
73 let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
74 debug!("Subscription statement: {}", statement);
75
76 let frames = execute_subscription(
77 state.actor_system(),
78 state.engine_clone(),
79 statement,
80 identity,
81 Params::None,
82 state.query_timeout(),
83 )
84 .await?;
85
86 let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
87
88 if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
90 let address = if addr_col.data.len() > 0 {
91 match addr_col.data.get_value(0) {
92 Value::Utf8(s) => s,
93 _ => return Err(CreateSubscriptionError::ExtractionFailed),
94 }
95 } else {
96 return Err(CreateSubscriptionError::ExtractionFailed);
97 };
98
99 let rql = frame
100 .columns
101 .iter()
102 .find(|c| c.name == "remote_rql")
103 .and_then(|col| {
104 if col.data.len() > 0 {
105 match col.data.get_value(0) {
106 Value::Utf8(s) => Some(s),
107 _ => None,
108 }
109 } else {
110 None
111 }
112 })
113 .ok_or(CreateSubscriptionError::ExtractionFailed)?;
114
115 return Ok(CreateSubscriptionResult::Remote {
116 address,
117 query: rql,
118 });
119 }
120
121 frame.columns
123 .iter()
124 .find(|c| c.name == "subscription_id")
125 .and_then(|col| {
126 if col.data.len() > 0 {
127 Some(col.data.get_value(0))
128 } else {
129 None
130 }
131 })
132 .and_then(|value| match value {
133 Value::Uint8(id) => Some(SubscriptionId(id)),
134 other => {
135 error!("subscription_id column has wrong type: {:?}", other);
136 None
137 }
138 })
139 .map(CreateSubscriptionResult::Local)
140 .ok_or(CreateSubscriptionError::ExtractionFailed)
141}
142
143pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
145 let mut txn = engine.begin_subscription()?;
146 let flow_name = subscription_flow_name(subscription_id);
147 let namespace_id = subscription_flow_namespace();
148 drop_flow_by_name(txn.as_admin_mut(), namespace_id, &flow_name)?;
149 drop_subscription(txn.as_admin_mut(), subscription_id)?;
150 txn.commit()?;
151 Ok(())
152}
153
154pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
156 let engine = state.engine_clone();
157 let system = state.actor_system();
158
159 system.compute(move || cleanup_subscription_sync(&engine, subscription_id))
160 .await
161 .map_err(|e| Error(internal(format!("Compute pool error: {:?}", e))))?
162}