reifydb_core/interface/
cdc.rs1use serde::{Deserialize, Serialize};
5
6use crate::{
7 common::CommitVersion,
8 encoded::{encoded::EncodedValues, key::EncodedKey},
9 interface::change::Change,
10};
11
12#[repr(transparent)]
13#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
14pub struct CdcConsumerId(pub(crate) String);
15
16impl CdcConsumerId {
17 pub fn new(id: impl Into<String>) -> Self {
18 let id = id.into();
19 assert_ne!(id, "__FLOW_CONSUMER");
20 Self(id)
21 }
22
23 pub fn flow_consumer() -> Self {
24 Self("__FLOW_CONSUMER".to_string())
25 }
26}
27
28impl AsRef<str> for CdcConsumerId {
29 fn as_ref(&self) -> &str {
30 &self.0
31 }
32}
33
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
37pub enum SystemChange {
38 Insert {
39 key: EncodedKey,
40 post: EncodedValues,
41 },
42 Update {
43 key: EncodedKey,
44 pre: EncodedValues,
45 post: EncodedValues,
46 },
47 Delete {
48 key: EncodedKey,
49 pre: Option<EncodedValues>,
50 },
51}
52
53impl SystemChange {
54 pub fn key(&self) -> &EncodedKey {
56 match self {
57 SystemChange::Insert {
58 key,
59 ..
60 } => key,
61 SystemChange::Update {
62 key,
63 ..
64 } => key,
65 SystemChange::Delete {
66 key,
67 ..
68 } => key,
69 }
70 }
71
72 pub fn value_bytes(&self) -> usize {
74 match self {
75 SystemChange::Insert {
76 post,
77 ..
78 } => post.len(),
79 SystemChange::Update {
80 pre,
81 post,
82 ..
83 } => pre.len() + post.len(),
84 SystemChange::Delete {
85 pre,
86 ..
87 } => pre.as_ref().map(|p| p.len()).unwrap_or(0),
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct Cdc {
95 pub version: CommitVersion,
96 pub timestamp: u64,
97 pub changes: Vec<Change>,
99 pub system_changes: Vec<SystemChange>,
101}
102
103impl Cdc {
104 pub fn new(
105 version: CommitVersion,
106 timestamp: u64,
107 changes: Vec<Change>,
108 system_changes: Vec<SystemChange>,
109 ) -> Self {
110 Self {
111 version,
112 timestamp,
113 changes,
114 system_changes,
115 }
116 }
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct ConsumerState {
122 pub consumer_id: CdcConsumerId,
123 pub checkpoint: CommitVersion,
124}
125
126#[derive(Debug, Clone)]
128pub struct CdcBatch {
129 pub items: Vec<Cdc>,
131 pub has_more: bool,
133}
134
135impl CdcBatch {
136 pub fn empty() -> Self {
138 Self {
139 items: Vec::new(),
140 has_more: false,
141 }
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.items.is_empty()
147 }
148}