reifydb_sub_server/
subscribe.rs1use std::fmt;
5
6use reifydb_core::{error::diagnostic::internal::internal, interface::catalog::id::SubscriptionId};
7use reifydb_engine::engine::StandardEngine;
8use reifydb_type::{
9 Result as TypeResult,
10 error::Error,
11 params::Params,
12 value::{Value, identity::IdentityId},
13};
14use tokio::task::spawn_blocking;
15use tracing::{debug, error};
16
17use crate::{
18 execute::{ExecuteError, execute},
19 interceptor::{Operation, RequestContext, RequestMetadata},
20 state::AppState,
21};
22
23pub enum CreateSubscriptionError {
25 Execute(ExecuteError),
26 ExtractionFailed,
27}
28
29impl fmt::Display for CreateSubscriptionError {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
33 CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
34 }
35 }
36}
37
38impl fmt::Debug for CreateSubscriptionError {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
42 CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
43 }
44 }
45}
46
47impl From<ExecuteError> for CreateSubscriptionError {
48 fn from(err: ExecuteError) -> Self {
49 CreateSubscriptionError::Execute(err)
50 }
51}
52
53pub enum CreateSubscriptionResult {
55 Local(SubscriptionId),
56 Remote {
57 address: String,
58 query: String,
59 },
60}
61
62pub async fn create_subscription(
64 state: &AppState,
65 identity: IdentityId,
66 query: &str,
67 metadata: RequestMetadata,
68) -> Result<CreateSubscriptionResult, CreateSubscriptionError> {
69 let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
70 debug!("Subscription statement: {}", statement);
71
72 let ctx = RequestContext {
73 identity,
74 operation: Operation::Subscribe,
75 statements: vec![statement],
76 params: Params::None,
77 metadata,
78 };
79
80 let (frames, _duration) =
81 execute(state.request_interceptors(), state.engine_clone(), ctx, state.query_timeout(), state.clock())
82 .await?;
83
84 let frame = frames.first().ok_or(CreateSubscriptionError::ExtractionFailed)?;
85
86 if let Some(addr_col) = frame.columns.iter().find(|c| c.name == "remote_address") {
88 let address = if !addr_col.data.is_empty() {
89 match addr_col.data.get_value(0) {
90 Value::Utf8(s) => s,
91 _ => return Err(CreateSubscriptionError::ExtractionFailed),
92 }
93 } else {
94 return Err(CreateSubscriptionError::ExtractionFailed);
95 };
96
97 let rql = frame
98 .columns
99 .iter()
100 .find(|c| c.name == "remote_rql")
101 .and_then(|col| {
102 if !col.data.is_empty() {
103 match col.data.get_value(0) {
104 Value::Utf8(s) => Some(s),
105 _ => None,
106 }
107 } else {
108 None
109 }
110 })
111 .ok_or(CreateSubscriptionError::ExtractionFailed)?;
112
113 return Ok(CreateSubscriptionResult::Remote {
114 address,
115 query: rql,
116 });
117 }
118
119 frame.columns
121 .iter()
122 .find(|c| c.name == "subscription_id")
123 .and_then(|col| {
124 if !col.data.is_empty() {
125 Some(col.data.get_value(0))
126 } else {
127 None
128 }
129 })
130 .and_then(|value| match value {
131 Value::Uint8(id) => Some(SubscriptionId(id)),
132 other => {
133 error!("subscription_id column has wrong type: {:?}", other);
134 None
135 }
136 })
137 .map(CreateSubscriptionResult::Local)
138 .ok_or(CreateSubscriptionError::ExtractionFailed)
139}
140
141pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
143 let rql = format!("drop subscription if exists subscription_{};", subscription_id.0);
144 engine.admin_as(IdentityId::system(), &rql, Params::None)?;
145 Ok(())
146}
147
148pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
150 let engine = state.engine_clone();
151
152 spawn_blocking(move || cleanup_subscription_sync(&engine, subscription_id))
153 .await
154 .map_err(|e| Error(Box::new(internal(format!("Blocking task error: {:?}", e)))))?
155}