reifydb_sub_server/
subscribe.rs1use std::fmt;
5
6use reifydb_catalog::{drop_flow_by_name, drop_subscription};
7use reifydb_core::{
8 error::diagnostic::internal::internal,
9 interface::catalog::{
10 id::SubscriptionId,
11 subscription::{subscription_flow_name, subscription_flow_namespace},
12 },
13};
14use reifydb_engine::engine::StandardEngine;
15use reifydb_type::{
16 Result as TypeResult,
17 error::Error,
18 params::Params,
19 value::{Value, identity::IdentityId},
20};
21use tracing::{debug, error};
22
23use crate::{
24 execute::{ExecuteError, execute_admin},
25 state::AppState,
26};
27
28pub enum CreateSubscriptionError {
30 Execute(ExecuteError),
31 ExtractionFailed,
32}
33
34impl fmt::Display for CreateSubscriptionError {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 match self {
37 CreateSubscriptionError::Execute(e) => write!(f, "{}", e),
38 CreateSubscriptionError::ExtractionFailed => write!(f, "Failed to extract subscription ID"),
39 }
40 }
41}
42
43impl fmt::Debug for CreateSubscriptionError {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 match self {
46 CreateSubscriptionError::Execute(e) => f.debug_tuple("Execute").field(e).finish(),
47 CreateSubscriptionError::ExtractionFailed => write!(f, "ExtractionFailed"),
48 }
49 }
50}
51
52impl From<ExecuteError> for CreateSubscriptionError {
53 fn from(err: ExecuteError) -> Self {
54 CreateSubscriptionError::Execute(err)
55 }
56}
57
58pub async fn create_subscription(
60 state: &AppState,
61 identity: IdentityId,
62 query: &str,
63) -> Result<SubscriptionId, CreateSubscriptionError> {
64 let statement = format!("CREATE SUBSCRIPTION AS {{ {} }}", query);
65 debug!("Subscription statement: {}", statement);
66
67 let frames = execute_admin(
68 state.actor_system(),
69 state.engine_clone(),
70 vec![statement],
71 identity,
72 Params::None,
73 state.query_timeout(),
74 )
75 .await?;
76
77 frames.first()
78 .and_then(|frame| frame.columns.iter().find(|c| c.name == "subscription_id"))
79 .and_then(|col| {
80 if col.data.len() > 0 {
81 Some(col.data.get_value(0))
82 } else {
83 None
84 }
85 })
86 .and_then(|value| match value {
87 Value::Uint8(id) => Some(SubscriptionId(id)),
88 other => {
89 error!("subscription_id column has wrong type: {:?}", other);
90 None
91 }
92 })
93 .ok_or(CreateSubscriptionError::ExtractionFailed)
94}
95
96pub fn cleanup_subscription_sync(engine: &StandardEngine, subscription_id: SubscriptionId) -> TypeResult<()> {
98 let mut txn = engine.begin_admin()?;
99 let flow_name = subscription_flow_name(subscription_id);
100 let namespace_id = subscription_flow_namespace();
101 drop_flow_by_name(&mut txn, namespace_id, &flow_name)?;
102 drop_subscription(&mut txn, subscription_id)?;
103 txn.commit()?;
104 Ok(())
105}
106
107pub async fn cleanup_subscription(state: &AppState, subscription_id: SubscriptionId) -> TypeResult<()> {
109 let engine = state.engine_clone();
110 let system = state.actor_system();
111
112 system.compute(move || cleanup_subscription_sync(&engine, subscription_id))
113 .await
114 .map_err(|e| Error(internal(format!("Compute pool error: {:?}", e))))?
115}