reifydb_sub_server/
subscribe.rs1use std::fmt;
5
6#[cfg(not(reifydb_single_threaded))]
7use reifydb_core::error::diagnostic::internal::internal;
8use reifydb_core::interface::catalog::id::SubscriptionId;
9use reifydb_engine::engine::StandardEngine;
10#[cfg(not(reifydb_single_threaded))]
11use reifydb_type::error::Error;
12use reifydb_type::{
13 Result as TypeResult,
14 params::Params,
15 value::{Value, frame::frame::Frame, identity::IdentityId},
16};
17#[cfg(not(reifydb_single_threaded))]
18use tracing::debug;
19#[allow(unused_imports)]
20use tracing::error;
21
22pub enum CreateSubscriptionError {
24 Execute(ExecuteError),
25 ExtractionFailed,
26}
27
28impl fmt::Display for CreateSubscriptionError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
32 CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
33 }
34 }
35}
36
37impl fmt::Debug for CreateSubscriptionError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
41 CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
42 }
43 }
44}
45
46impl From<ExecuteError> for CreateSubscriptionError {
47 fn from(err: ExecuteError) -> Self {
48 CreateSubscriptionError::Execute(err)
49 }
50}
51
52pub enum CreateSubscriptionResult {
54 Local(SubscriptionId),
55 Remote {
56 address: String,
57 rql: String,
58 token: Option<String>,
59 },
60}
61
62#[cfg(not(reifydb_single_threaded))]
63use reifydb_core::actors::server::Operation;
64#[cfg(not(reifydb_single_threaded))]
65use tokio::task::spawn_blocking;
66
67use crate::execute::ExecuteError;
68#[cfg(not(reifydb_single_threaded))]
69use crate::{
70 dispatch::dispatch_subscribe,
71 interceptor::{RequestContext, RequestMetadata},
72 state::AppState,
73};
74
75#[cfg(not(reifydb_single_threaded))]
77pub async fn create_subscription(
78 state: &AppState,
79 identity: IdentityId,
80 rql: &str,
81 metadata: RequestMetadata,
82) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
83 let subscription_rql = format!("CREATE SUBSCRIPTION AS {{ {} }}", rql);
84 debug!("Subscription rql: {}", subscription_rql);
85
86 let ctx = RequestContext {
87 identity,
88 operation: Operation::Subscribe,
89 rql: subscription_rql,
90 params: Params::None,
91 metadata,
92 };
93
94 let (frames, _metrics) = dispatch_subscribe(state, ctx).await?;
95
96 let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
97
98 if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
100 let address = if !addr_col.data.is_empty() {
101 match addr_col.data.get_value(0) {
102 Value::Utf8(s) => s,
103 _ => return Err(CreateSubscriptionError::ExtractionFailed),
104 }
105 } else {
106 return Err(CreateSubscriptionError::ExtractionFailed);
107 };
108
109 let rql = frame
110 .columns
111 .iter()
112 .find(|c| c.name == "remote_rql")
113 .and_then(|col| {
114 if !col.data.is_empty() {
115 match col.data.get_value(0) {
116 Value::Utf8(s) => Some(s),
117 _ => None,
118 }
119 } else {
120 None
121 }
122 })
123 .ok_or(CreateSubscriptionError::ExtractionFailed)?;
124
125 let token = frame.columns.iter().find(|c| c.name == "remote_token").and_then(|col| {
126 if !col.data.is_empty() {
127 match col.data.get_value(0) {
128 Value::Utf8(s) => Some(s),
129 _ => None,
130 }
131 } else {
132 None
133 }
134 });
135
136 return Ok(CreateSubscriptionResult::Remote {
137 address,
138 rql,
139 token,
140 });
141 }
142
143 frame.columns
145 .iter()
146 .find(|c| c.name == "subscription_id")
147 .and_then(|col| {
148 if !col.data.is_empty() {
149 Some(col.data.get_value(0))
150 } else {
151 None
152 }
153 })
154 .and_then(|value| match value {
155 Value::Uint8(id) => Some(SubscriptionId(id)),
156 other => {
157 error!("subscription_id column has wrong type: {:?}", other);
158 None
159 }
160 })
161 .map(CreateSubscriptionResult::Local)
162 .ok_or(CreateSubscriptionError::ExtractionFailed)
163}
164
165pub fn extract_subscription_id(frames: &[Frame]) -> Option<SubscriptionId> {
170 let frame = frames.first()?;
171 frame.columns
172 .iter()
173 .find(|c| c.name == "subscription_id")
174 .and_then(|col| {
175 if !col.data.is_empty() {
176 Some(col.data.get_value(0))
177 } else {
178 None
179 }
180 })
181 .and_then(|value| match value {
182 Value::Uint8(id) => Some(SubscriptionId(id)),
183 _ => None,
184 })
185}
186
187pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
189 let rql = format!("drop subscription if exists subscription_{};", subscription_id.0);
190 engine.admin_as(IdentityId::system(), &rql, Params::None).check()?;
191 Ok(())
192}
193
194#[cfg(not(reifydb_single_threaded))]
196pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
197 let engine = state.engine_clone();
198
199 spawn_blocking(move || cleanup_subscription_sync(&engine, subscription_id))
200 .await
201 .map_err(|e| Error(Box::new(internal(format!("Blocking task error: {:?}", e)))))?
202}