Skip to main content

reifydb_core/interface/
cdc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use serde::{Deserialize, Serialize};
5
6use crate::{
7	common::CommitVersion,
8	encoded::{encoded::EncodedValues, key::EncodedKey},
9	interface::change::Change,
10};
11
12#[repr(transparent)]
13#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
14pub struct CdcConsumerId(pub(crate) String);
15
16impl CdcConsumerId {
17	pub fn new(id: impl Into<String>) -> Self {
18		let id = id.into();
19		assert_ne!(id, "__FLOW_CONSUMER");
20		Self(id)
21	}
22
23	pub fn flow_consumer() -> Self {
24		Self("__FLOW_CONSUMER".to_string())
25	}
26}
27
28impl AsRef<str> for CdcConsumerId {
29	fn as_ref(&self) -> &str {
30		&self.0
31	}
32}
33
34/// Internal system/metadata change (flow registrations, catalog ops, etc.)
35/// Kept in encoded form for key-level inspection.
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub enum SystemChange {
38	Insert {
39		key: EncodedKey,
40		post: EncodedValues,
41	},
42	Update {
43		key: EncodedKey,
44		pre: EncodedValues,
45		post: EncodedValues,
46	},
47	Delete {
48		key: EncodedKey,
49		pre: Option<EncodedValues>,
50	},
51}
52
53impl SystemChange {
54	/// Get the key for this change.
55	pub fn key(&self) -> &EncodedKey {
56		match self {
57			SystemChange::Insert {
58				key,
59				..
60			} => key,
61			SystemChange::Update {
62				key,
63				..
64			} => key,
65			SystemChange::Delete {
66				key,
67				..
68			} => key,
69		}
70	}
71
72	/// Calculate the approximate value bytes for this change (pre + post values).
73	pub fn value_bytes(&self) -> usize {
74		match self {
75			SystemChange::Insert {
76				post,
77				..
78			} => post.len(),
79			SystemChange::Update {
80				pre,
81				post,
82				..
83			} => pre.len() + post.len(),
84			SystemChange::Delete {
85				pre,
86				..
87			} => pre.as_ref().map(|p| p.len()).unwrap_or(0),
88		}
89	}
90}
91
92/// Structure for storing CDC data with shared metadata
93#[derive(Debug, Clone)]
94pub struct Cdc {
95	pub version: CommitVersion,
96	pub timestamp: u64,
97	/// Row-data changes in columnar format
98	pub changes: Vec<Change>,
99	/// Internal system changes
100	pub system_changes: Vec<SystemChange>,
101}
102
103impl Cdc {
104	pub fn new(
105		version: CommitVersion,
106		timestamp: u64,
107		changes: Vec<Change>,
108		system_changes: Vec<SystemChange>,
109	) -> Self {
110		Self {
111			version,
112			timestamp,
113			changes,
114			system_changes,
115		}
116	}
117}
118
119/// Represents the state of a CDC consumer
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct ConsumerState {
122	pub consumer_id: CdcConsumerId,
123	pub checkpoint: CommitVersion,
124}
125
126/// A batch of CDC entries with continuation info.
127#[derive(Debug, Clone)]
128pub struct CdcBatch {
129	/// The CDC entries in this batch.
130	pub items: Vec<Cdc>,
131	/// Whether there are more items after this batch.
132	pub has_more: bool,
133}
134
135impl CdcBatch {
136	/// Creates an empty batch with no more results.
137	pub fn empty() -> Self {
138		Self {
139			items: Vec::new(),
140			has_more: false,
141		}
142	}
143
144	/// Returns true if this batch contains no items.
145	pub fn is_empty(&self) -> bool {
146		self.items.is_empty()
147	}
148}