reifydb_sub_server/
subscribe.rs1use std::fmt;
5
6use reifydb_catalog::{drop_flow_by_name, drop_subscription};
7use reifydb_core::{
8 error::diagnostic::internal::internal,
9 interface::catalog::{
10 id::SubscriptionId,
11 subscription::{subscription_flow_name, subscription_flow_namespace},
12 },
13};
14use reifydb_engine::engine::StandardEngine;
15use reifydb_type::{
16 Result as TypeResult,
17 error::Error,
18 params::Params,
19 value::{Value, identity::IdentityId},
20};
21use tracing::{debug, error};
22
23use crate::{
24 execute::{ExecuteError, execute},
25 interceptor::{Operation, RequestContext, RequestMetadata},
26 state::AppState,
27};
28
29pub enum CreateSubscriptionError {
31 Execute(ExecuteError),
32 ExtractionFailed,
33}
34
35impl fmt::Display for CreateSubscriptionError {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
39 CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
40 }
41 }
42}
43
44impl fmt::Debug for CreateSubscriptionError {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 match self {
47 CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
48 CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
49 }
50 }
51}
52
53impl From<ExecuteError> for CreateSubscriptionError {
54 fn from(err: ExecuteError) -> Self {
55 CreateSubscriptionError::Execute(err)
56 }
57}
58
59pub enum CreateSubscriptionResult {
61 Local(SubscriptionId),
62 Remote {
63 address: String,
64 query: String,
65 },
66}
67
68pub async fn create_subscription(
70 state: &AppState,
71 identity: IdentityId,
72 query: &str,
73 metadata: RequestMetadata,
74) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
75 let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
76 debug!("Subscription statement: {}", statement);
77
78 let ctx = RequestContext {
79 identity,
80 operation: Operation::Subscribe,
81 statements: vec![statement],
82 params: Params::None,
83 metadata,
84 };
85
86 let (frames, _duration) = execute(
87 state.request_interceptors(),
88 state.actor_system(),
89 state.engine_clone(),
90 ctx,
91 state.query_timeout(),
92 state.clock(),
93 )
94 .await?;
95
96 let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
97
98 if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
100 let address = if addr_col.data.len() > 0 {
101 match addr_col.data.get_value(0) {
102 Value::Utf8(s) => s,
103 _ => return Err(CreateSubscriptionError::ExtractionFailed),
104 }
105 } else {
106 return Err(CreateSubscriptionError::ExtractionFailed);
107 };
108
109 let rql = frame
110 .columns
111 .iter()
112 .find(|c| c.name == "remote_rql")
113 .and_then(|col| {
114 if col.data.len() > 0 {
115 match col.data.get_value(0) {
116 Value::Utf8(s) => Some(s),
117 _ => None,
118 }
119 } else {
120 None
121 }
122 })
123 .ok_or(CreateSubscriptionError::ExtractionFailed)?;
124
125 return Ok(CreateSubscriptionResult::Remote {
126 address,
127 query: rql,
128 });
129 }
130
131 frame.columns
133 .iter()
134 .find(|c| c.name == "subscription_id")
135 .and_then(|col| {
136 if col.data.len() > 0 {
137 Some(col.data.get_value(0))
138 } else {
139 None
140 }
141 })
142 .and_then(|value| match value {
143 Value::Uint8(id) => Some(SubscriptionId(id)),
144 other => {
145 error!("subscription_id column has wrong type: {:?}", other);
146 None
147 }
148 })
149 .map(CreateSubscriptionResult::Local)
150 .ok_or(CreateSubscriptionError::ExtractionFailed)
151}
152
153pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
155 let mut txn = engine.begin_subscription(IdentityId::system())?;
156 let flow_name = subscription_flow_name(subscription_id);
157 let namespace_id = subscription_flow_namespace();
158 drop_flow_by_name(txn.as_admin_mut(), namespace_id, &flow_name)?;
159 drop_subscription(txn.as_admin_mut(), subscription_id)?;
160 txn.commit()?;
161 Ok(())
162}
163
164pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
166 let engine = state.engine_clone();
167 let system = state.actor_system();
168
169 system.compute(move || cleanup_subscription_sync(&engine, subscription_id))
170 .await
171 .map_err(|e| Error(internal(format!("Compute pool error: {:?}", e))))?
172}