reifydb_engine/
subscription.rs1use std::{result::Result as StdResult, sync::Arc};
5
6use reifydb_core::{
7 common::CommitVersion, interface::catalog::id::SubscriptionId, metric::ExecutionMetrics,
8 value::column::columns::Columns,
9};
10use reifydb_rql::flow::flow::FlowDag;
11use reifydb_transaction::{multi::lease::VersionLeaseGuard, transaction::Transaction};
12use reifydb_type::{Result, error::Error as TypeError, value::identity::IdentityId};
13
14use crate::engine::StandardEngine;
15
16#[derive(Debug)]
17pub enum HydrateError {
18 SubscriptionNotFound,
19 UnsupportedSourceType,
20 RowCapExceeded {
21 cap: u64,
22 },
23 Engine(TypeError),
24 Internal(String),
25}
26
27impl From<TypeError> for HydrateError {
28 fn from(e: TypeError) -> Self {
29 HydrateError::Engine(e)
30 }
31}
32
33impl HydrateError {
34 pub fn is_version_evicted(&self) -> bool {
35 matches!(self, HydrateError::Engine(e) if e.0.code == "TXN_012")
36 }
37
38 pub fn wire_code(&self) -> &'static str {
39 match self {
40 Self::SubscriptionNotFound => "HYDRATION_FAILED",
41 Self::UnsupportedSourceType => "HYDRATION_UNSUPPORTED_SOURCE",
42 Self::RowCapExceeded {
43 ..
44 } => "HYDRATION_TOO_LARGE",
45 Self::Engine(_) => {
46 if self.is_version_evicted() {
47 "HYDRATION_VERSION_EVICTED"
48 } else {
49 "HYDRATION_FAILED"
50 }
51 }
52 Self::Internal(_) => "HYDRATION_FAILED",
53 }
54 }
55
56 pub fn wire_message(&self, rql: &str, cap: u64) -> String {
57 match self {
58 Self::SubscriptionNotFound => "Subscription not found at hydration time".to_string(),
59 Self::UnsupportedSourceType => "hydration is not supported for SourceFlow / SourceSeries / SourceInlineData; use WITH { hydration: { enabled: false } } to subscribe without it".to_string(),
60 Self::RowCapExceeded { .. } => format!(
61 "Hydration exceeds subscribe.max_hydration_rows={}; add `TAKE N` upstream, lower with WITH {{ hydration: {{ max_rows: ... }} }}, or disable with WITH {{ hydration: {{ enabled: false }} }}. Query: {}",
62 cap, rql
63 ),
64 Self::Engine(e) => {
65 if self.is_version_evicted() {
66 e.0.message.clone()
67 } else {
68 e.to_string()
69 }
70 }
71 Self::Internal(s) => s.clone(),
72 }
73 }
74}
75
76#[derive(Debug)]
77pub struct HydrateOutcome {
78 pub version: CommitVersion,
79 pub batches: Vec<Columns>,
80 pub metrics: ExecutionMetrics,
81}
82
83pub trait SubscriptionService: Send + Sync {
84 fn next_id(&self) -> SubscriptionId;
85
86 fn register_subscription(
87 &self,
88 id: SubscriptionId,
89 flow_dag: FlowDag,
90 column_names: Vec<String>,
91 txn: &mut Transaction<'_>,
92 ) -> Result<()>;
93
94 fn unregister_subscription(&self, id: &SubscriptionId) -> Result<()>;
95
96 fn hydrate(
97 &self,
98 sub_id: SubscriptionId,
99 engine: &StandardEngine,
100 identity: IdentityId,
101 lease: VersionLeaseGuard,
102 max_rows: u64,
103 ) -> StdResult<HydrateOutcome, HydrateError>;
104}
105
106pub type SubscriptionServiceRef = Arc<dyn SubscriptionService>;