reifydb_sub_server/
subscribe.rs1use std::fmt;
5
6#[cfg(not(reifydb_single_threaded))]
7use reifydb_core::error::diagnostic::internal::internal;
8use reifydb_core::interface::catalog::id::SubscriptionId;
9use reifydb_engine::engine::StandardEngine;
10#[cfg(not(reifydb_single_threaded))]
11use reifydb_type::error::Error;
12use reifydb_type::{
13 Result as TypeResult,
14 params::Params,
15 value::{Value, frame::frame::Frame, identity::IdentityId},
16};
17#[cfg(not(reifydb_single_threaded))]
18use tracing::debug;
19#[allow(unused_imports)]
20use tracing::error;
21
22pub enum CreateSubscriptionError {
24 Execute(ExecuteError),
25 ExtractionFailed,
26}
27
28impl fmt::Display for CreateSubscriptionError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
32 CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
33 }
34 }
35}
36
37impl fmt::Debug for CreateSubscriptionError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
41 CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
42 }
43 }
44}
45
46impl From<ExecuteError> for CreateSubscriptionError {
47 fn from(err: ExecuteError) -> Self {
48 CreateSubscriptionError::Execute(err)
49 }
50}
51
52pub enum CreateSubscriptionResult {
54 Local(SubscriptionId),
55 Remote {
56 address: String,
57 query: String,
58 },
59}
60
61#[cfg(not(reifydb_single_threaded))]
62use reifydb_core::actors::server::Operation;
63#[cfg(not(reifydb_single_threaded))]
64use tokio::task::spawn_blocking;
65
66use crate::execute::ExecuteError;
67#[cfg(not(reifydb_single_threaded))]
68use crate::{
69 dispatch::dispatch_subscribe,
70 interceptor::{RequestContext, RequestMetadata},
71 state::AppState,
72};
73
74#[cfg(not(reifydb_single_threaded))]
76pub async fn create_subscription(
77 state: &AppState,
78 identity: IdentityId,
79 query: &str,
80 metadata: RequestMetadata,
81) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
82 let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
83 debug!("Subscription statement: {}", statement);
84
85 let ctx = RequestContext {
86 identity,
87 operation: Operation::Subscribe,
88 statements: vec![statement],
89 params: Params::None,
90 metadata,
91 };
92
93 let (frames, _duration) = dispatch_subscribe(state, ctx).await?;
94
95 let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
96
97 if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
99 let address = if !addr_col.data.is_empty() {
100 match addr_col.data.get_value(0) {
101 Value::Utf8(s) => s,
102 _ => return Err(CreateSubscriptionError::ExtractionFailed),
103 }
104 } else {
105 return Err(CreateSubscriptionError::ExtractionFailed);
106 };
107
108 let rql = frame
109 .columns
110 .iter()
111 .find(|c| c.name == "remote_rql")
112 .and_then(|col| {
113 if !col.data.is_empty() {
114 match col.data.get_value(0) {
115 Value::Utf8(s) => Some(s),
116 _ => None,
117 }
118 } else {
119 None
120 }
121 })
122 .ok_or(CreateSubscriptionError::ExtractionFailed)?;
123
124 return Ok(CreateSubscriptionResult::Remote {
125 address,
126 query: rql,
127 });
128 }
129
130 frame.columns
132 .iter()
133 .find(|c| c.name == "subscription_id")
134 .and_then(|col| {
135 if !col.data.is_empty() {
136 Some(col.data.get_value(0))
137 } else {
138 None
139 }
140 })
141 .and_then(|value| match value {
142 Value::Uint8(id) => Some(SubscriptionId(id)),
143 other => {
144 error!("subscription_id column has wrong type: {:?}", other);
145 None
146 }
147 })
148 .map(CreateSubscriptionResult::Local)
149 .ok_or(CreateSubscriptionError::ExtractionFailed)
150}
151
152pub fn extract_subscription_id(frames: &[Frame]) -> Option<SubscriptionId> {
157 let frame = frames.first()?;
158 frame.columns
159 .iter()
160 .find(|c| c.name == "subscription_id")
161 .and_then(|col| {
162 if !col.data.is_empty() {
163 Some(col.data.get_value(0))
164 } else {
165 None
166 }
167 })
168 .and_then(|value| match value {
169 Value::Uint8(id) => Some(SubscriptionId(id)),
170 _ => None,
171 })
172}
173
174pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
176 let rql = format!("drop subscription if exists subscription_{};", subscription_id.0);
177 engine.admin_as(IdentityId::system(), &rql, Params::None).check()?;
178 Ok(())
179}
180
181#[cfg(not(reifydb_single_threaded))]
183pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
184 let engine = state.engine_clone();
185
186 spawn_blocking(move || cleanup_subscription_sync(&engine, subscription_id))
187 .await
188 .map_err(|e| Error(Box::new(internal(format!("Blocking task error: {:?}", e)))))?
189}