Skip to main content

reifydb_core/interface/
change.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, sync::Arc};
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_type::{Result, value::datetime::DateTime};
8use serde::{Deserialize, Serialize};
9use smallvec::SmallVec;
10
11use crate::{
12	common::CommitVersion,
13	interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14	value::column::columns::Columns,
15};
16
17pub type Diffs = SmallVec<[Diff; 4]>;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub enum ChangeOrigin {
21	Shape(ShapeId),
22	Flow(FlowNodeId),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum Diff {
27	Insert {
28		post: Arc<Columns>,
29		origin: Option<ChangeOrigin>,
30	},
31	Update {
32		pre: Arc<Columns>,
33		post: Arc<Columns>,
34		origin: Option<ChangeOrigin>,
35	},
36	Remove {
37		pre: Arc<Columns>,
38		origin: Option<ChangeOrigin>,
39	},
40}
41
42impl Diff {
43	pub fn insert(post: Columns) -> Self {
44		Self::Insert {
45			post: Arc::new(post),
46			origin: None,
47		}
48	}
49
50	pub fn update(pre: Columns, post: Columns) -> Self {
51		Self::Update {
52			pre: Arc::new(pre),
53			post: Arc::new(post),
54			origin: None,
55		}
56	}
57
58	pub fn remove(pre: Columns) -> Self {
59		Self::Remove {
60			pre: Arc::new(pre),
61			origin: None,
62		}
63	}
64
65	pub fn insert_arc(post: Arc<Columns>) -> Self {
66		Self::Insert {
67			post,
68			origin: None,
69		}
70	}
71
72	pub fn update_arc(pre: Arc<Columns>, post: Arc<Columns>) -> Self {
73		Self::Update {
74			pre,
75			post,
76			origin: None,
77		}
78	}
79
80	pub fn remove_arc(pre: Arc<Columns>) -> Self {
81		Self::Remove {
82			pre,
83			origin: None,
84		}
85	}
86
87	pub fn pre(&self) -> Option<&Columns> {
88		match self {
89			Diff::Insert {
90				..
91			} => None,
92			Diff::Update {
93				pre,
94				..
95			} => Some(pre),
96			Diff::Remove {
97				pre,
98				..
99			} => Some(pre),
100		}
101	}
102
103	pub fn post(&self) -> Option<&Columns> {
104		match self {
105			Diff::Insert {
106				post,
107				..
108			} => Some(post),
109			Diff::Update {
110				post,
111				..
112			} => Some(post),
113			Diff::Remove {
114				..
115			} => None,
116		}
117	}
118
119	pub fn kind(&self) -> DiffType {
120		match self {
121			Diff::Insert {
122				..
123			} => DiffType::Insert,
124			Diff::Update {
125				..
126			} => DiffType::Update,
127			Diff::Remove {
128				..
129			} => DiffType::Remove,
130		}
131	}
132
133	pub fn row_count(&self) -> usize {
134		match self {
135			Diff::Insert {
136				post,
137				..
138			} => post.row_count(),
139			Diff::Update {
140				post,
141				..
142			} => post.row_count(),
143			Diff::Remove {
144				pre,
145				..
146			} => pre.row_count(),
147		}
148	}
149
150	pub fn origin(&self) -> Option<&ChangeOrigin> {
151		match self {
152			Diff::Insert {
153				origin,
154				..
155			} => origin.as_ref(),
156			Diff::Update {
157				origin,
158				..
159			} => origin.as_ref(),
160			Diff::Remove {
161				origin,
162				..
163			} => origin.as_ref(),
164		}
165	}
166
167	pub fn set_origin(&mut self, new_origin: Option<ChangeOrigin>) {
168		match self {
169			Diff::Insert {
170				origin,
171				..
172			} => *origin = new_origin,
173			Diff::Update {
174				origin,
175				..
176			} => *origin = new_origin,
177			Diff::Remove {
178				origin,
179				..
180			} => *origin = new_origin,
181		}
182	}
183
184	pub fn effective_origin<'a>(&'a self, parent: &'a ChangeOrigin) -> &'a ChangeOrigin {
185		self.origin().unwrap_or(parent)
186	}
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct Change {
191	pub origin: ChangeOrigin,
192
193	pub diffs: Diffs,
194
195	pub version: CommitVersion,
196
197	pub changed_at: DateTime,
198}
199
200impl Change {
201	pub fn from_shape(
202		shape: ShapeId,
203		version: CommitVersion,
204		diffs: impl Into<Diffs>,
205		changed_at: DateTime,
206	) -> Self {
207		Self {
208			origin: ChangeOrigin::Shape(shape),
209			diffs: diffs.into(),
210			version,
211			changed_at,
212		}
213	}
214
215	pub fn from_flow(
216		from: FlowNodeId,
217		version: CommitVersion,
218		diffs: impl Into<Diffs>,
219		changed_at: DateTime,
220	) -> Self {
221		Self {
222			origin: ChangeOrigin::Flow(from),
223			diffs: diffs.into(),
224			version,
225			changed_at,
226		}
227	}
228
229	pub fn row_count(&self) -> usize {
230		self.diffs.iter().map(Diff::row_count).sum()
231	}
232
233	pub fn merge(changes: Vec<Change>) -> Result<Change> {
234		let mut iter = changes.into_iter();
235		let mut merged = iter.next().expect("Change::merge requires at least one Change");
236		for mut ch in iter {
237			if ch.changed_at > merged.changed_at {
238				merged.changed_at = ch.changed_at;
239			}
240			if ch.origin != merged.origin {
241				for diff in ch.diffs.iter_mut() {
242					if diff.origin().is_none() {
243						diff.set_origin(Some(ch.origin.clone()));
244					}
245				}
246			}
247			merged.diffs.extend(ch.diffs);
248		}
249		merged.coalesce()?;
250		Ok(merged)
251	}
252
253	pub fn coalesce(&mut self) -> Result<()> {
254		if self.diffs.len() <= 1 {
255			return Ok(());
256		}
257		let original = mem::take(&mut self.diffs);
258		let mut merged: Diffs = SmallVec::with_capacity(original.len());
259		for diff in original {
260			if diff.row_count() == 0 {
261				continue;
262			}
263			let same_kind_and_origin = match (merged.last(), &diff) {
264				(Some(last), next) => last.kind() == next.kind() && last.origin() == next.origin(),
265				_ => false,
266			};
267			if same_kind_and_origin {
268				let last = merged.last_mut().expect("non-empty by same_kind_and_origin branch");
269				merge_into(last, diff)?;
270			} else {
271				merged.push(diff);
272			}
273		}
274		self.diffs = merged;
275		Ok(())
276	}
277}
278
279fn merge_into(target: &mut Diff, source: Diff) -> Result<()> {
280	fn unwrap_or_clone(arc: Arc<Columns>) -> Columns {
281		Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone())
282	}
283	match (target, source) {
284		(
285			Diff::Insert {
286				post: t,
287				..
288			},
289			Diff::Insert {
290				post: s,
291				..
292			},
293		) => Arc::make_mut(t).append_all(unwrap_or_clone(s)),
294		(
295			Diff::Update {
296				pre: tp,
297				post: tpost,
298				..
299			},
300			Diff::Update {
301				pre: sp,
302				post: spost,
303				..
304			},
305		) => {
306			Arc::make_mut(tp).append_all(unwrap_or_clone(sp))?;
307			Arc::make_mut(tpost).append_all(unwrap_or_clone(spost))
308		}
309		(
310			Diff::Remove {
311				pre: t,
312				..
313			},
314			Diff::Remove {
315				pre: s,
316				..
317			},
318		) => Arc::make_mut(t).append_all(unwrap_or_clone(s)),
319		_ => unreachable!("merge_into requires matching diff kinds"),
320	}
321}