Skip to main content

reifydb_core/interface/catalog/
subscription.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::r#type::Type;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8	common::CommitVersion,
9	encoded::schema::{Schema, SchemaField},
10	interface::catalog::{
11		id::{NamespaceId, SubscriptionColumnId, SubscriptionId},
12		key::PrimaryKeyDef,
13	},
14};
15
16/// Implicit column names for subscriptions
17pub const IMPLICIT_COLUMN_OP: &str = "_op";
18
19/// A column definition for a subscription.
20/// Simpler than regular ColumnDef - only has id, name, and type.
21#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22pub struct SubscriptionColumnDef {
23	pub id: SubscriptionColumnId,
24	pub name: String,
25	pub ty: Type,
26}
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct SubscriptionDef {
30	pub id: SubscriptionId,
31	// Note: Subscriptions do NOT have names - identified only by ID
32	pub columns: Vec<SubscriptionColumnDef>,
33	pub primary_key: Option<PrimaryKeyDef>,
34	pub acknowledged_version: CommitVersion,
35}
36
37impl SubscriptionDef {
38	/// Returns the implicit columns that are automatically added to all subscriptions
39	pub fn implicit_columns() -> Vec<SubscriptionColumnDef> {
40		vec![SubscriptionColumnDef {
41			id: SubscriptionColumnId(u64::MAX - 2), // Use high IDs for implicit columns
42			name: IMPLICIT_COLUMN_OP.to_string(),
43			ty: Type::Uint1, // 1=INSERT, 2=UPDATE, 3=DELETE
44		}]
45	}
46
47	/// Returns all columns including user-defined and implicit columns
48	pub fn all_columns(&self) -> Vec<SubscriptionColumnDef> {
49		let mut all = self.columns.clone();
50		all.extend(Self::implicit_columns());
51		all
52	}
53}
54
55impl From<&SubscriptionDef> for Schema {
56	fn from(value: &SubscriptionDef) -> Self {
57		// Use only user-defined columns for schema (implicit columns like _op removed)
58		let fields = value
59			.columns
60			.iter()
61			.map(|col| SchemaField::unconstrained(col.name.clone(), col.ty.clone()))
62			.collect();
63		Schema::new(fields)
64	}
65}
66
67/// Returns the flow name for a subscription using the deterministic naming convention.
68/// Subscription flows are always created in the system namespace.
69pub fn subscription_flow_name(id: SubscriptionId) -> String {
70	format!("subscription_{}", id.0)
71}
72
73/// Returns the namespace ID where subscription flows are created (system namespace).
74pub const fn subscription_flow_namespace() -> NamespaceId {
75	NamespaceId(1)
76}