Skip to main content

reifydb_engine/
subscription.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{result::Result as StdResult, sync::Arc};
5
6use reifydb_core::{
7	common::CommitVersion, interface::catalog::id::SubscriptionId, metric::ExecutionMetrics,
8	value::column::columns::Columns,
9};
10use reifydb_rql::flow::flow::FlowDag;
11use reifydb_transaction::{multi::lease::VersionLeaseGuard, transaction::Transaction};
12use reifydb_type::{Result, error::Error as TypeError, value::identity::IdentityId};
13
14use crate::engine::StandardEngine;
15
16#[derive(Debug)]
17pub enum HydrateError {
18	SubscriptionNotFound,
19	UnsupportedSourceType,
20	RowCapExceeded {
21		cap: u64,
22	},
23	Engine(TypeError),
24	Internal(String),
25}
26
27impl From<TypeError> for HydrateError {
28	fn from(e: TypeError) -> Self {
29		HydrateError::Engine(e)
30	}
31}
32
33impl HydrateError {
34	pub fn is_version_evicted(&self) -> bool {
35		matches!(self, HydrateError::Engine(e) if e.0.code == "TXN_012")
36	}
37
38	pub fn wire_code(&self) -> &'static str {
39		match self {
40			Self::SubscriptionNotFound => "HYDRATION_FAILED",
41			Self::UnsupportedSourceType => "HYDRATION_UNSUPPORTED_SOURCE",
42			Self::RowCapExceeded {
43				..
44			} => "HYDRATION_TOO_LARGE",
45			Self::Engine(_) => {
46				if self.is_version_evicted() {
47					"HYDRATION_VERSION_EVICTED"
48				} else {
49					"HYDRATION_FAILED"
50				}
51			}
52			Self::Internal(_) => "HYDRATION_FAILED",
53		}
54	}
55
56	pub fn wire_message(&self, rql: &str, cap: u64) -> String {
57		match self {
58			Self::SubscriptionNotFound => "Subscription not found at hydration time".to_string(),
59			Self::UnsupportedSourceType => "hydration is not supported for SourceFlow / SourceSeries / SourceInlineData; use WITH { hydration: { enabled: false } } to subscribe without it".to_string(),
60			Self::RowCapExceeded { .. } => format!(
61				"Hydration exceeds subscribe.max_hydration_rows={}; add `TAKE N` upstream, lower with WITH {{ hydration: {{ max_rows: ... }} }}, or disable with WITH {{ hydration: {{ enabled: false }} }}. Query: {}",
62				cap, rql
63			),
64			Self::Engine(e) => {
65				if self.is_version_evicted() {
66					e.0.message.clone()
67				} else {
68					e.to_string()
69				}
70			}
71			Self::Internal(s) => s.clone(),
72		}
73	}
74}
75
76#[derive(Debug)]
77pub struct HydrateOutcome {
78	pub version: CommitVersion,
79	pub batches: Vec<Columns>,
80	pub metrics: ExecutionMetrics,
81}
82
83pub trait SubscriptionService: Send + Sync {
84	fn next_id(&self) -> SubscriptionId;
85
86	fn register_subscription(
87		&self,
88		id: SubscriptionId,
89		flow_dag: FlowDag,
90		column_names: Vec<String>,
91		txn: &mut Transaction<'_>,
92	) -> Result<()>;
93
94	fn unregister_subscription(&self, id: &SubscriptionId) -> Result<()>;
95
96	fn hydrate(
97		&self,
98		sub_id: SubscriptionId,
99		engine: &StandardEngine,
100		identity: IdentityId,
101		lease: VersionLeaseGuard,
102		max_rows: u64,
103	) -> StdResult<HydrateOutcome, HydrateError>;
104}
105
106pub type SubscriptionServiceRef = Arc<dyn SubscriptionService>;