1use std::{mem, sync::Arc};
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_type::{Result, value::datetime::DateTime};
8use serde::{Deserialize, Serialize};
9use smallvec::SmallVec;
10
11use crate::{
12 common::CommitVersion,
13 interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14 value::column::columns::Columns,
15};
16
17pub type Diffs = SmallVec<[Diff; 4]>;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub enum ChangeOrigin {
21 Shape(ShapeId),
22 Flow(FlowNodeId),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum Diff {
27 Insert {
28 post: Arc<Columns>,
29 origin: Option<ChangeOrigin>,
30 },
31 Update {
32 pre: Arc<Columns>,
33 post: Arc<Columns>,
34 origin: Option<ChangeOrigin>,
35 },
36 Remove {
37 pre: Arc<Columns>,
38 origin: Option<ChangeOrigin>,
39 },
40}
41
42impl Diff {
43 pub fn insert(post: Columns) -> Self {
44 Self::Insert {
45 post: Arc::new(post),
46 origin: None,
47 }
48 }
49
50 pub fn update(pre: Columns, post: Columns) -> Self {
51 Self::Update {
52 pre: Arc::new(pre),
53 post: Arc::new(post),
54 origin: None,
55 }
56 }
57
58 pub fn remove(pre: Columns) -> Self {
59 Self::Remove {
60 pre: Arc::new(pre),
61 origin: None,
62 }
63 }
64
65 pub fn insert_arc(post: Arc<Columns>) -> Self {
66 Self::Insert {
67 post,
68 origin: None,
69 }
70 }
71
72 pub fn update_arc(pre: Arc<Columns>, post: Arc<Columns>) -> Self {
73 Self::Update {
74 pre,
75 post,
76 origin: None,
77 }
78 }
79
80 pub fn remove_arc(pre: Arc<Columns>) -> Self {
81 Self::Remove {
82 pre,
83 origin: None,
84 }
85 }
86
87 pub fn pre(&self) -> Option<&Columns> {
88 match self {
89 Diff::Insert {
90 ..
91 } => None,
92 Diff::Update {
93 pre,
94 ..
95 } => Some(pre),
96 Diff::Remove {
97 pre,
98 ..
99 } => Some(pre),
100 }
101 }
102
103 pub fn post(&self) -> Option<&Columns> {
104 match self {
105 Diff::Insert {
106 post,
107 ..
108 } => Some(post),
109 Diff::Update {
110 post,
111 ..
112 } => Some(post),
113 Diff::Remove {
114 ..
115 } => None,
116 }
117 }
118
119 pub fn kind(&self) -> DiffType {
120 match self {
121 Diff::Insert {
122 ..
123 } => DiffType::Insert,
124 Diff::Update {
125 ..
126 } => DiffType::Update,
127 Diff::Remove {
128 ..
129 } => DiffType::Remove,
130 }
131 }
132
133 pub fn row_count(&self) -> usize {
134 match self {
135 Diff::Insert {
136 post,
137 ..
138 } => post.row_count(),
139 Diff::Update {
140 post,
141 ..
142 } => post.row_count(),
143 Diff::Remove {
144 pre,
145 ..
146 } => pre.row_count(),
147 }
148 }
149
150 pub fn origin(&self) -> Option<&ChangeOrigin> {
151 match self {
152 Diff::Insert {
153 origin,
154 ..
155 } => origin.as_ref(),
156 Diff::Update {
157 origin,
158 ..
159 } => origin.as_ref(),
160 Diff::Remove {
161 origin,
162 ..
163 } => origin.as_ref(),
164 }
165 }
166
167 pub fn set_origin(&mut self, new_origin: Option<ChangeOrigin>) {
168 match self {
169 Diff::Insert {
170 origin,
171 ..
172 } => *origin = new_origin,
173 Diff::Update {
174 origin,
175 ..
176 } => *origin = new_origin,
177 Diff::Remove {
178 origin,
179 ..
180 } => *origin = new_origin,
181 }
182 }
183
184 pub fn effective_origin<'a>(&'a self, parent: &'a ChangeOrigin) -> &'a ChangeOrigin {
185 self.origin().unwrap_or(parent)
186 }
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct Change {
191 pub origin: ChangeOrigin,
192
193 pub diffs: Diffs,
194
195 pub version: CommitVersion,
196
197 pub changed_at: DateTime,
198}
199
200impl Change {
201 pub fn from_shape(
202 shape: ShapeId,
203 version: CommitVersion,
204 diffs: impl Into<Diffs>,
205 changed_at: DateTime,
206 ) -> Self {
207 Self {
208 origin: ChangeOrigin::Shape(shape),
209 diffs: diffs.into(),
210 version,
211 changed_at,
212 }
213 }
214
215 pub fn from_flow(
216 from: FlowNodeId,
217 version: CommitVersion,
218 diffs: impl Into<Diffs>,
219 changed_at: DateTime,
220 ) -> Self {
221 Self {
222 origin: ChangeOrigin::Flow(from),
223 diffs: diffs.into(),
224 version,
225 changed_at,
226 }
227 }
228
229 pub fn row_count(&self) -> usize {
230 self.diffs.iter().map(Diff::row_count).sum()
231 }
232
233 pub fn merge(changes: Vec<Change>) -> Result<Change> {
234 let mut iter = changes.into_iter();
235 let mut merged = iter.next().expect("Change::merge requires at least one Change");
236 for mut ch in iter {
237 if ch.changed_at > merged.changed_at {
238 merged.changed_at = ch.changed_at;
239 }
240 if ch.origin != merged.origin {
241 for diff in ch.diffs.iter_mut() {
242 if diff.origin().is_none() {
243 diff.set_origin(Some(ch.origin.clone()));
244 }
245 }
246 }
247 merged.diffs.extend(ch.diffs);
248 }
249 merged.coalesce()?;
250 Ok(merged)
251 }
252
253 pub fn coalesce(&mut self) -> Result<()> {
254 if self.diffs.len() <= 1 {
255 return Ok(());
256 }
257 let original = mem::take(&mut self.diffs);
258 let mut merged: Diffs = SmallVec::with_capacity(original.len());
259 for diff in original {
260 if diff.row_count() == 0 {
261 continue;
262 }
263 let same_kind_and_origin = match (merged.last(), &diff) {
264 (Some(last), next) => last.kind() == next.kind() && last.origin() == next.origin(),
265 _ => false,
266 };
267 if same_kind_and_origin {
268 let last = merged.last_mut().expect("non-empty by same_kind_and_origin branch");
269 merge_into(last, diff)?;
270 } else {
271 merged.push(diff);
272 }
273 }
274 self.diffs = merged;
275 Ok(())
276 }
277}
278
279fn merge_into(target: &mut Diff, source: Diff) -> Result<()> {
280 fn unwrap_or_clone(arc: Arc<Columns>) -> Columns {
281 Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone())
282 }
283 match (target, source) {
284 (
285 Diff::Insert {
286 post: t,
287 ..
288 },
289 Diff::Insert {
290 post: s,
291 ..
292 },
293 ) => Arc::make_mut(t).append_all(unwrap_or_clone(s)),
294 (
295 Diff::Update {
296 pre: tp,
297 post: tpost,
298 ..
299 },
300 Diff::Update {
301 pre: sp,
302 post: spost,
303 ..
304 },
305 ) => {
306 Arc::make_mut(tp).append_all(unwrap_or_clone(sp))?;
307 Arc::make_mut(tpost).append_all(unwrap_or_clone(spost))
308 }
309 (
310 Diff::Remove {
311 pre: t,
312 ..
313 },
314 Diff::Remove {
315 pre: s,
316 ..
317 },
318 ) => Arc::make_mut(t).append_all(unwrap_or_clone(s)),
319 _ => unreachable!("merge_into requires matching diff kinds"),
320 }
321}