Skip to main content

reifydb_core/interface/
change.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::mem;
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_value::{Result, value::datetime::DateTime};
8use serde::{Deserialize, Serialize};
9use smallvec::SmallVec;
10
11use crate::{
12	common::CommitVersion,
13	interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14	value::column::columns::Columns,
15};
16
17pub type Diffs = SmallVec<[Diff; 4]>;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub enum ChangeOrigin {
21	Shape(ShapeId),
22	Flow(FlowNodeId),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum Diff {
27	Insert {
28		post: Columns,
29		origin: Option<ChangeOrigin>,
30	},
31	Update {
32		pre: Columns,
33		post: Columns,
34		origin: Option<ChangeOrigin>,
35	},
36	Remove {
37		pre: Columns,
38		origin: Option<ChangeOrigin>,
39	},
40}
41
42impl Diff {
43	pub fn insert(post: Columns) -> Self {
44		Self::Insert {
45			post,
46			origin: None,
47		}
48	}
49
50	pub fn update(pre: Columns, post: Columns) -> Self {
51		Self::Update {
52			pre,
53			post,
54			origin: None,
55		}
56	}
57
58	pub fn remove(pre: Columns) -> Self {
59		Self::Remove {
60			pre,
61			origin: None,
62		}
63	}
64
65	pub fn pre(&self) -> Option<&Columns> {
66		match self {
67			Diff::Insert {
68				..
69			} => None,
70			Diff::Update {
71				pre,
72				..
73			} => Some(pre),
74			Diff::Remove {
75				pre,
76				..
77			} => Some(pre),
78		}
79	}
80
81	pub fn post(&self) -> Option<&Columns> {
82		match self {
83			Diff::Insert {
84				post,
85				..
86			} => Some(post),
87			Diff::Update {
88				post,
89				..
90			} => Some(post),
91			Diff::Remove {
92				..
93			} => None,
94		}
95	}
96
97	pub fn kind(&self) -> DiffType {
98		match self {
99			Diff::Insert {
100				..
101			} => DiffType::Insert,
102			Diff::Update {
103				..
104			} => DiffType::Update,
105			Diff::Remove {
106				..
107			} => DiffType::Remove,
108		}
109	}
110
111	pub fn row_count(&self) -> usize {
112		match self {
113			Diff::Insert {
114				post,
115				..
116			} => post.row_count(),
117			Diff::Update {
118				post,
119				..
120			} => post.row_count(),
121			Diff::Remove {
122				pre,
123				..
124			} => pre.row_count(),
125		}
126	}
127
128	pub fn origin(&self) -> Option<&ChangeOrigin> {
129		match self {
130			Diff::Insert {
131				origin,
132				..
133			} => origin.as_ref(),
134			Diff::Update {
135				origin,
136				..
137			} => origin.as_ref(),
138			Diff::Remove {
139				origin,
140				..
141			} => origin.as_ref(),
142		}
143	}
144
145	pub fn set_origin(&mut self, new_origin: Option<ChangeOrigin>) {
146		match self {
147			Diff::Insert {
148				origin,
149				..
150			} => *origin = new_origin,
151			Diff::Update {
152				origin,
153				..
154			} => *origin = new_origin,
155			Diff::Remove {
156				origin,
157				..
158			} => *origin = new_origin,
159		}
160	}
161
162	pub fn effective_origin<'a>(&'a self, parent: &'a ChangeOrigin) -> &'a ChangeOrigin {
163		self.origin().unwrap_or(parent)
164	}
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct Change {
169	pub origin: ChangeOrigin,
170
171	pub diffs: Diffs,
172
173	pub version: CommitVersion,
174
175	pub changed_at: DateTime,
176}
177
178impl Change {
179	pub fn from_shape(
180		shape: ShapeId,
181		version: CommitVersion,
182		diffs: impl Into<Diffs>,
183		changed_at: DateTime,
184	) -> Self {
185		Self {
186			origin: ChangeOrigin::Shape(shape),
187			diffs: diffs.into(),
188			version,
189			changed_at,
190		}
191	}
192
193	pub fn from_flow(
194		from: FlowNodeId,
195		version: CommitVersion,
196		diffs: impl Into<Diffs>,
197		changed_at: DateTime,
198	) -> Self {
199		Self {
200			origin: ChangeOrigin::Flow(from),
201			diffs: diffs.into(),
202			version,
203			changed_at,
204		}
205	}
206
207	pub fn row_count(&self) -> usize {
208		self.diffs.iter().map(Diff::row_count).sum()
209	}
210
211	pub fn merge(changes: Vec<Change>) -> Result<Change> {
212		let mut iter = changes.into_iter();
213		let mut merged = iter.next().expect("Change::merge requires at least one Change");
214		for mut ch in iter {
215			if ch.changed_at > merged.changed_at {
216				merged.changed_at = ch.changed_at;
217			}
218			if ch.origin != merged.origin {
219				for diff in ch.diffs.iter_mut() {
220					if diff.origin().is_none() {
221						diff.set_origin(Some(ch.origin.clone()));
222					}
223				}
224			}
225			merged.diffs.extend(ch.diffs);
226		}
227		merged.coalesce()?;
228		Ok(merged)
229	}
230
231	pub fn coalesce(&mut self) -> Result<()> {
232		if self.diffs.len() <= 1 {
233			return Ok(());
234		}
235		let original = mem::take(&mut self.diffs);
236		let mut merged: Diffs = SmallVec::with_capacity(original.len());
237		for diff in original {
238			if diff.row_count() == 0 {
239				continue;
240			}
241			let same_kind_and_origin = match (merged.last(), &diff) {
242				(Some(last), next) => last.kind() == next.kind() && last.origin() == next.origin(),
243				_ => false,
244			};
245			if same_kind_and_origin {
246				let last = merged.last_mut().expect("non-empty by same_kind_and_origin branch");
247				merge_into(last, diff)?;
248			} else {
249				merged.push(diff);
250			}
251		}
252		self.diffs = merged;
253		Ok(())
254	}
255}
256
257fn merge_into(target: &mut Diff, source: Diff) -> Result<()> {
258	match (target, source) {
259		(
260			Diff::Insert {
261				post: t,
262				..
263			},
264			Diff::Insert {
265				post: s,
266				..
267			},
268		) => t.append_all(s),
269		(
270			Diff::Update {
271				pre: tp,
272				post: tpost,
273				..
274			},
275			Diff::Update {
276				pre: sp,
277				post: spost,
278				..
279			},
280		) => {
281			tp.append_all(sp)?;
282			tpost.append_all(spost)
283		}
284		(
285			Diff::Remove {
286				pre: t,
287				..
288			},
289			Diff::Remove {
290				pre: s,
291				..
292			},
293		) => t.append_all(s),
294		_ => unreachable!("merge_into requires matching diff kinds"),
295	}
296}