Skip to main content

reifydb_core/interface/
change.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_type::value::datetime::DateTime;
8use serde::{Deserialize, Serialize};
9use smallvec::SmallVec;
10
11use crate::{
12	common::CommitVersion,
13	interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14	value::column::columns::Columns,
15};
16
17/// Inline-storage container for `Change.diffs`. Most operator emissions
18/// produce 1-3 diffs per call; reserving 4 inline avoids the heap allocation
19/// in the typical case while spilling to the heap for fan-out-heavy ops.
20pub type Diffs = SmallVec<[Diff; 4]>;
21
22/// Origin of a change
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum ChangeOrigin {
25	Shape(ShapeId),
26	Flow(FlowNodeId),
27}
28
29/// Represents a single diff.
30///
31/// Carries `Arc<Columns>` so that cloning a `Diff` (or the enclosing
32/// `Change`) is a refcount bump rather than a deep copy of every column,
33/// and so that producers (e.g. `CdcProducerActor`) can hold onto a slab
34/// pool of `Arc<Columns>` and reuse them across calls when `strong_count`
35/// drops back to 1 after dispatch.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub enum Diff {
38	Insert {
39		post: Arc<Columns>,
40	},
41	Update {
42		pre: Arc<Columns>,
43		post: Arc<Columns>,
44	},
45	Remove {
46		pre: Arc<Columns>,
47	},
48}
49
50impl Diff {
51	/// Build an insert diff from an owned `Columns`. Wraps internally.
52	pub fn insert(post: Columns) -> Self {
53		Self::Insert {
54			post: Arc::new(post),
55		}
56	}
57
58	/// Build an update diff from owned `Columns`. Wraps internally.
59	pub fn update(pre: Columns, post: Columns) -> Self {
60		Self::Update {
61			pre: Arc::new(pre),
62			post: Arc::new(post),
63		}
64	}
65
66	/// Build a remove diff from an owned `Columns`. Wraps internally.
67	pub fn remove(pre: Columns) -> Self {
68		Self::Remove {
69			pre: Arc::new(pre),
70		}
71	}
72
73	/// Build an insert diff from an already-`Arc`'d `Columns`. Used by
74	/// the `CdcProducerActor` slab pool to avoid an extra `Arc::new`.
75	pub fn insert_arc(post: Arc<Columns>) -> Self {
76		Self::Insert {
77			post,
78		}
79	}
80
81	/// Build an update diff from already-`Arc`'d `Columns`.
82	pub fn update_arc(pre: Arc<Columns>, post: Arc<Columns>) -> Self {
83		Self::Update {
84			pre,
85			post,
86		}
87	}
88
89	/// Build a remove diff from an already-`Arc`'d `Columns`.
90	pub fn remove_arc(pre: Arc<Columns>) -> Self {
91		Self::Remove {
92			pre,
93		}
94	}
95
96	/// Pre-image columns (None for `Insert`).
97	pub fn pre(&self) -> Option<&Columns> {
98		match self {
99			Diff::Insert {
100				..
101			} => None,
102			Diff::Update {
103				pre,
104				..
105			} => Some(pre),
106			Diff::Remove {
107				pre,
108			} => Some(pre),
109		}
110	}
111
112	/// Post-image columns (None for `Remove`).
113	pub fn post(&self) -> Option<&Columns> {
114		match self {
115			Diff::Insert {
116				post,
117			} => Some(post),
118			Diff::Update {
119				post,
120				..
121			} => Some(post),
122			Diff::Remove {
123				..
124			} => None,
125		}
126	}
127
128	/// Kind tag; parity with `BorrowedDiff::kind`.
129	pub fn kind(&self) -> DiffType {
130		match self {
131			Diff::Insert {
132				..
133			} => DiffType::Insert,
134			Diff::Update {
135				..
136			} => DiffType::Update,
137			Diff::Remove {
138				..
139			} => DiffType::Remove,
140		}
141	}
142}
143
144/// A change with origin, diffs, version, and timestamp
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct Change {
147	/// Origin of this change
148	pub origin: ChangeOrigin,
149	/// The list of diffs (changes)
150	pub diffs: Diffs,
151	/// Version of this change.
152	pub version: CommitVersion,
153	/// Timestamp when this was changed
154	pub changed_at: DateTime,
155}
156
157impl Change {
158	/// Create a change from a shape (external) source
159	pub fn from_shape(
160		shape: ShapeId,
161		version: CommitVersion,
162		diffs: impl Into<Diffs>,
163		changed_at: DateTime,
164	) -> Self {
165		Self {
166			origin: ChangeOrigin::Shape(shape),
167			diffs: diffs.into(),
168			version,
169			changed_at,
170		}
171	}
172
173	/// Create a change from a flow node (internal)
174	pub fn from_flow(
175		from: FlowNodeId,
176		version: CommitVersion,
177		diffs: impl Into<Diffs>,
178		changed_at: DateTime,
179	) -> Self {
180		Self {
181			origin: ChangeOrigin::Flow(from),
182			diffs: diffs.into(),
183			version,
184			changed_at,
185		}
186	}
187}