reifydb_core/interface/
cdc.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_type::internal_error;
5use serde::{Deserialize, Serialize};
6
7use super::{CdcConsumerKeyRange, EncodableKey, QueryTransaction};
8use crate::{CommitVersion, EncodedKey, key::CdcConsumerKey, value::encoded::EncodedValues};
9
10#[repr(transparent)]
11#[derive(Debug, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
12pub struct CdcConsumerId(pub(crate) String);
13
14impl CdcConsumerId {
15	pub fn new(id: impl Into<String>) -> Self {
16		let id = id.into();
17		assert_ne!(id, "__FLOW_CONSUMER");
18		Self(id)
19	}
20
21	pub fn flow_consumer() -> Self {
22		Self("__FLOW_CONSUMER".to_string())
23	}
24}
25
26impl AsRef<str> for CdcConsumerId {
27	fn as_ref(&self) -> &str {
28		&self.0
29	}
30}
31
32#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33pub enum CdcChange {
34	Insert {
35		key: EncodedKey,
36		post: EncodedValues,
37	},
38	Update {
39		key: EncodedKey,
40		pre: EncodedValues,
41		post: EncodedValues,
42	},
43	Delete {
44		key: EncodedKey,
45		pre: Option<EncodedValues>,
46	},
47}
48
49/// Structure for storing CDC data with shared metadata
50#[derive(Debug, Clone, PartialEq)]
51pub struct Cdc {
52	pub version: CommitVersion,
53	pub timestamp: u64,
54	pub changes: Vec<CdcSequencedChange>,
55}
56
57impl Cdc {
58	pub fn new(version: CommitVersion, timestamp: u64, changes: Vec<CdcSequencedChange>) -> Self {
59		Self {
60			version,
61			timestamp,
62			changes,
63		}
64	}
65}
66
67/// Structure for individual changes within a transaction
68#[derive(Debug, Clone, PartialEq)]
69pub struct CdcSequencedChange {
70	pub sequence: u16,
71	pub change: CdcChange,
72}
73
74impl CdcSequencedChange {
75	pub fn key(&self) -> &EncodedKey {
76		match &self.change {
77			CdcChange::Insert {
78				key,
79				..
80			} => key,
81			CdcChange::Update {
82				key,
83				..
84			} => key,
85			CdcChange::Delete {
86				key,
87				..
88			} => key,
89		}
90	}
91}
92
93/// Represents the state of a CDC consumer
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ConsumerState {
96	pub consumer_id: CdcConsumerId,
97	pub checkpoint: CommitVersion,
98}
99
100/// Retrieves the state of all CDC consumers
101pub fn get_all_consumer_states<T: QueryTransaction>(txn: &mut T) -> reifydb_type::Result<Vec<ConsumerState>> {
102	let mut states = Vec::new();
103
104	for multi in txn.range(CdcConsumerKeyRange::full_scan())? {
105		let key = CdcConsumerKey::decode(&multi.key)
106			.ok_or_else(|| internal_error!("Unable to decode CdConsumerKey"))?;
107
108		if multi.values.len() >= 8 {
109			let mut buffer = [0u8; 8];
110			buffer.copy_from_slice(&multi.values[0..8]);
111			let checkpoint = CommitVersion(u64::from_be_bytes(buffer));
112
113			states.push(ConsumerState {
114				consumer_id: key.consumer,
115				checkpoint,
116			});
117		}
118	}
119
120	Ok(states)
121}