Skip to main content

reifydb_core/interface/
cdc.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::datetime::DateTime;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8	common::CommitVersion,
9	encoded::{key::EncodedKey, row::EncodedRow},
10	interface::change::Change,
11};
12
13#[repr(transparent)]
14#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
15pub struct CdcConsumerId(pub(crate) String);
16
17impl CdcConsumerId {
18	pub fn new(id: impl Into<String>) -> Self {
19		let id = id.into();
20		assert_ne!(id, "__FLOW_CONSUMER");
21		Self(id)
22	}
23
24	pub fn flow_consumer() -> Self {
25		Self("__FLOW_CONSUMER".to_string())
26	}
27}
28
29impl AsRef<str> for CdcConsumerId {
30	fn as_ref(&self) -> &str {
31		&self.0
32	}
33}
34
35/// Internal system/metadata change (flow registrations, catalog ops, etc.)
36/// Kept in encoded form for key-level inspection.
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum SystemChange {
39	Insert {
40		key: EncodedKey,
41		post: EncodedRow,
42	},
43	Update {
44		key: EncodedKey,
45		pre: EncodedRow,
46		post: EncodedRow,
47	},
48	Delete {
49		key: EncodedKey,
50		pre: Option<EncodedRow>,
51	},
52}
53
54impl SystemChange {
55	/// Get the key for this change.
56	pub fn key(&self) -> &EncodedKey {
57		match self {
58			SystemChange::Insert {
59				key,
60				..
61			} => key,
62			SystemChange::Update {
63				key,
64				..
65			} => key,
66			SystemChange::Delete {
67				key,
68				..
69			} => key,
70		}
71	}
72
73	/// Calculate the approximate value bytes for this change (pre + post values).
74	pub fn value_bytes(&self) -> usize {
75		match self {
76			SystemChange::Insert {
77				post,
78				..
79			} => post.len(),
80			SystemChange::Update {
81				pre,
82				post,
83				..
84			} => pre.len() + post.len(),
85			SystemChange::Delete {
86				pre,
87				..
88			} => pre.as_ref().map(|p| p.len()).unwrap_or(0),
89		}
90	}
91}
92
93/// Structure for storing CDC data with shared metadata
94#[derive(Debug, Clone)]
95pub struct Cdc {
96	pub version: CommitVersion,
97	pub timestamp: DateTime,
98	/// Row-data changes in columnar format
99	pub changes: Vec<Change>,
100	/// Internal system changes
101	pub system_changes: Vec<SystemChange>,
102}
103
104impl Cdc {
105	pub fn new(
106		version: CommitVersion,
107		timestamp: DateTime,
108		changes: Vec<Change>,
109		system_changes: Vec<SystemChange>,
110	) -> Self {
111		Self {
112			version,
113			timestamp,
114			changes,
115			system_changes,
116		}
117	}
118}
119
120/// Represents the state of a CDC consumer
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct ConsumerState {
123	pub consumer_id: CdcConsumerId,
124	pub checkpoint: CommitVersion,
125}
126
127/// A batch of CDC entries with continuation info.
128#[derive(Debug, Clone)]
129pub struct CdcBatch {
130	/// The CDC entries in this batch.
131	pub items: Vec<Cdc>,
132	/// Whether there are more items after this batch.
133	pub has_more: bool,
134}
135
136impl CdcBatch {
137	/// Creates an empty batch with no more results.
138	pub fn empty() -> Self {
139		Self {
140			items: Vec::new(),
141			has_more: false,
142		}
143	}
144
145	/// Returns true if this batch contains no items.
146	pub fn is_empty(&self) -> bool {
147		self.items.is_empty()
148	}
149}