reifydb-engine 0.5.6

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::{result::Result as StdResult, sync::Arc};

use reifydb_core::{
	common::CommitVersion, interface::catalog::id::SubscriptionId, metric::ExecutionMetrics,
	value::column::columns::Columns,
};
use reifydb_rql::flow::flow::FlowDag;
use reifydb_transaction::{multi::lease::VersionLeaseGuard, transaction::Transaction};
use reifydb_type::{Result, error::Error as TypeError, value::identity::IdentityId};

use crate::engine::StandardEngine;

#[derive(Debug)]
pub enum HydrateError {
	SubscriptionNotFound,
	UnsupportedSourceType,
	RowCapExceeded {
		cap: u64,
	},
	Engine(TypeError),
	Internal(String),
}

impl From<TypeError> for HydrateError {
	fn from(e: TypeError) -> Self {
		HydrateError::Engine(e)
	}
}

impl HydrateError {
	pub fn is_version_evicted(&self) -> bool {
		matches!(self, HydrateError::Engine(e) if e.0.code == "TXN_012")
	}

	pub fn wire_code(&self) -> &'static str {
		match self {
			Self::SubscriptionNotFound => "HYDRATION_FAILED",
			Self::UnsupportedSourceType => "HYDRATION_UNSUPPORTED_SOURCE",
			Self::RowCapExceeded {
				..
			} => "HYDRATION_TOO_LARGE",
			Self::Engine(_) => {
				if self.is_version_evicted() {
					"HYDRATION_VERSION_EVICTED"
				} else {
					"HYDRATION_FAILED"
				}
			}
			Self::Internal(_) => "HYDRATION_FAILED",
		}
	}

	pub fn wire_message(&self, rql: &str, cap: u64) -> String {
		match self {
			Self::SubscriptionNotFound => "Subscription not found at hydration time".to_string(),
			Self::UnsupportedSourceType => "hydration is not supported for SourceFlow / SourceSeries / SourceInlineData; use WITH { hydration: { enabled: false } } to subscribe without it".to_string(),
			Self::RowCapExceeded { .. } => format!(
				"Hydration exceeds subscribe.max_hydration_rows={}; add `TAKE N` upstream, lower with WITH {{ hydration: {{ max_rows: ... }} }}, or disable with WITH {{ hydration: {{ enabled: false }} }}. Query: {}",
				cap, rql
			),
			Self::Engine(e) => {
				if self.is_version_evicted() {
					e.0.message.clone()
				} else {
					e.to_string()
				}
			}
			Self::Internal(s) => s.clone(),
		}
	}
}

#[derive(Debug)]
pub struct HydrateOutcome {
	pub version: CommitVersion,
	pub batches: Vec<Columns>,
	pub metrics: ExecutionMetrics,
}

pub trait SubscriptionService: Send + Sync {
	fn next_id(&self) -> SubscriptionId;

	fn register_subscription(
		&self,
		id: SubscriptionId,
		flow_dag: FlowDag,
		column_names: Vec<String>,
		txn: &mut Transaction<'_>,
	) -> Result<()>;

	fn unregister_subscription(&self, id: &SubscriptionId) -> Result<()>;

	fn hydrate(
		&self,
		sub_id: SubscriptionId,
		engine: &StandardEngine,
		identity: IdentityId,
		lease: VersionLeaseGuard,
		max_rows: u64,
	) -> StdResult<HydrateOutcome, HydrateError>;
}

pub type SubscriptionServiceRef = Arc<dyn SubscriptionService>;