Skip to main content

reifydb_core/interface/
cdc.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::datetime::DateTime;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8	common::CommitVersion,
9	encoded::{key::EncodedKey, row::EncodedRow},
10	interface::change::Change,
11};
12
13#[repr(transparent)]
14#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
15pub struct CdcConsumerId(pub(crate) String);
16
17impl CdcConsumerId {
18	pub fn new(id: impl Into<String>) -> Self {
19		let id = id.into();
20		assert_ne!(id, "__FLOW_CONSUMER");
21		Self(id)
22	}
23
24	pub fn flow_consumer() -> Self {
25		Self("__FLOW_CONSUMER".to_string())
26	}
27}
28
29impl AsRef<str> for CdcConsumerId {
30	fn as_ref(&self) -> &str {
31		&self.0
32	}
33}
34
35#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
36pub enum SystemChange {
37	Insert {
38		key: EncodedKey,
39		post: EncodedRow,
40	},
41	Update {
42		key: EncodedKey,
43		pre: EncodedRow,
44		post: EncodedRow,
45	},
46	Delete {
47		key: EncodedKey,
48		pre: Option<EncodedRow>,
49	},
50}
51
52impl SystemChange {
53	pub fn key(&self) -> &EncodedKey {
54		match self {
55			SystemChange::Insert {
56				key,
57				..
58			} => key,
59			SystemChange::Update {
60				key,
61				..
62			} => key,
63			SystemChange::Delete {
64				key,
65				..
66			} => key,
67		}
68	}
69
70	pub fn value_bytes(&self) -> usize {
71		match self {
72			SystemChange::Insert {
73				post,
74				..
75			} => post.len(),
76			SystemChange::Update {
77				pre,
78				post,
79				..
80			} => pre.len() + post.len(),
81			SystemChange::Delete {
82				pre,
83				..
84			} => pre.as_ref().map(|p| p.len()).unwrap_or(0),
85		}
86	}
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct Cdc {
91	pub version: CommitVersion,
92	pub timestamp: DateTime,
93
94	pub changes: Vec<Change>,
95
96	pub system_changes: Vec<SystemChange>,
97}
98
99impl Cdc {
100	pub fn new(
101		version: CommitVersion,
102		timestamp: DateTime,
103		changes: Vec<Change>,
104		system_changes: Vec<SystemChange>,
105	) -> Self {
106		Self {
107			version,
108			timestamp,
109			changes,
110			system_changes,
111		}
112	}
113}
114
115#[derive(Debug, Clone, PartialEq, Eq)]
116pub struct ConsumerState {
117	pub consumer_id: CdcConsumerId,
118	pub checkpoint: CommitVersion,
119}
120
121#[derive(Debug, Clone)]
122pub struct CdcBatch {
123	pub items: Vec<Cdc>,
124
125	pub has_more: bool,
126}
127
128impl CdcBatch {
129	pub fn empty() -> Self {
130		Self {
131			items: Vec::new(),
132			has_more: false,
133		}
134	}
135
136	pub fn is_empty(&self) -> bool {
137		self.items.is_empty()
138	}
139}