1use std::mem;
5
6use reifydb_abi::flow::diff::DiffType;
7use reifydb_value::{Result, value::datetime::DateTime};
8use serde::{Deserialize, Serialize};
9use smallvec::SmallVec;
10
11use crate::{
12 common::CommitVersion,
13 interface::catalog::{flow::FlowNodeId, shape::ShapeId},
14 value::column::columns::Columns,
15};
16
17pub type Diffs = SmallVec<[Diff; 4]>;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub enum ChangeOrigin {
21 Shape(ShapeId),
22 Flow(FlowNodeId),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum Diff {
27 Insert {
28 post: Columns,
29 origin: Option<ChangeOrigin>,
30 },
31 Update {
32 pre: Columns,
33 post: Columns,
34 origin: Option<ChangeOrigin>,
35 },
36 Remove {
37 pre: Columns,
38 origin: Option<ChangeOrigin>,
39 },
40}
41
42impl Diff {
43 pub fn insert(post: Columns) -> Self {
44 Self::Insert {
45 post,
46 origin: None,
47 }
48 }
49
50 pub fn update(pre: Columns, post: Columns) -> Self {
51 Self::Update {
52 pre,
53 post,
54 origin: None,
55 }
56 }
57
58 pub fn remove(pre: Columns) -> Self {
59 Self::Remove {
60 pre,
61 origin: None,
62 }
63 }
64
65 pub fn pre(&self) -> Option<&Columns> {
66 match self {
67 Diff::Insert {
68 ..
69 } => None,
70 Diff::Update {
71 pre,
72 ..
73 } => Some(pre),
74 Diff::Remove {
75 pre,
76 ..
77 } => Some(pre),
78 }
79 }
80
81 pub fn post(&self) -> Option<&Columns> {
82 match self {
83 Diff::Insert {
84 post,
85 ..
86 } => Some(post),
87 Diff::Update {
88 post,
89 ..
90 } => Some(post),
91 Diff::Remove {
92 ..
93 } => None,
94 }
95 }
96
97 pub fn kind(&self) -> DiffType {
98 match self {
99 Diff::Insert {
100 ..
101 } => DiffType::Insert,
102 Diff::Update {
103 ..
104 } => DiffType::Update,
105 Diff::Remove {
106 ..
107 } => DiffType::Remove,
108 }
109 }
110
111 pub fn row_count(&self) -> usize {
112 match self {
113 Diff::Insert {
114 post,
115 ..
116 } => post.row_count(),
117 Diff::Update {
118 post,
119 ..
120 } => post.row_count(),
121 Diff::Remove {
122 pre,
123 ..
124 } => pre.row_count(),
125 }
126 }
127
128 pub fn origin(&self) -> Option<&ChangeOrigin> {
129 match self {
130 Diff::Insert {
131 origin,
132 ..
133 } => origin.as_ref(),
134 Diff::Update {
135 origin,
136 ..
137 } => origin.as_ref(),
138 Diff::Remove {
139 origin,
140 ..
141 } => origin.as_ref(),
142 }
143 }
144
145 pub fn set_origin(&mut self, new_origin: Option<ChangeOrigin>) {
146 match self {
147 Diff::Insert {
148 origin,
149 ..
150 } => *origin = new_origin,
151 Diff::Update {
152 origin,
153 ..
154 } => *origin = new_origin,
155 Diff::Remove {
156 origin,
157 ..
158 } => *origin = new_origin,
159 }
160 }
161
162 pub fn effective_origin<'a>(&'a self, parent: &'a ChangeOrigin) -> &'a ChangeOrigin {
163 self.origin().unwrap_or(parent)
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct Change {
169 pub origin: ChangeOrigin,
170
171 pub diffs: Diffs,
172
173 pub version: CommitVersion,
174
175 pub changed_at: DateTime,
176}
177
178impl Change {
179 pub fn from_shape(
180 shape: ShapeId,
181 version: CommitVersion,
182 diffs: impl Into<Diffs>,
183 changed_at: DateTime,
184 ) -> Self {
185 Self {
186 origin: ChangeOrigin::Shape(shape),
187 diffs: diffs.into(),
188 version,
189 changed_at,
190 }
191 }
192
193 pub fn from_flow(
194 from: FlowNodeId,
195 version: CommitVersion,
196 diffs: impl Into<Diffs>,
197 changed_at: DateTime,
198 ) -> Self {
199 Self {
200 origin: ChangeOrigin::Flow(from),
201 diffs: diffs.into(),
202 version,
203 changed_at,
204 }
205 }
206
207 pub fn row_count(&self) -> usize {
208 self.diffs.iter().map(Diff::row_count).sum()
209 }
210
211 pub fn merge(changes: Vec<Change>) -> Result<Change> {
212 let mut iter = changes.into_iter();
213 let mut merged = iter.next().expect("Change::merge requires at least one Change");
214 for mut ch in iter {
215 if ch.changed_at > merged.changed_at {
216 merged.changed_at = ch.changed_at;
217 }
218 if ch.origin != merged.origin {
219 for diff in ch.diffs.iter_mut() {
220 if diff.origin().is_none() {
221 diff.set_origin(Some(ch.origin.clone()));
222 }
223 }
224 }
225 merged.diffs.extend(ch.diffs);
226 }
227 merged.coalesce()?;
228 Ok(merged)
229 }
230
231 pub fn coalesce(&mut self) -> Result<()> {
232 if self.diffs.len() <= 1 {
233 return Ok(());
234 }
235 let original = mem::take(&mut self.diffs);
236 let mut merged: Diffs = SmallVec::with_capacity(original.len());
237 for diff in original {
238 if diff.row_count() == 0 {
239 continue;
240 }
241 let same_kind_and_origin = match (merged.last(), &diff) {
242 (Some(last), next) => last.kind() == next.kind() && last.origin() == next.origin(),
243 _ => false,
244 };
245 if same_kind_and_origin {
246 let last = merged.last_mut().expect("non-empty by same_kind_and_origin branch");
247 merge_into(last, diff)?;
248 } else {
249 merged.push(diff);
250 }
251 }
252 self.diffs = merged;
253 Ok(())
254 }
255}
256
257fn merge_into(target: &mut Diff, source: Diff) -> Result<()> {
258 match (target, source) {
259 (
260 Diff::Insert {
261 post: t,
262 ..
263 },
264 Diff::Insert {
265 post: s,
266 ..
267 },
268 ) => t.append_all(s),
269 (
270 Diff::Update {
271 pre: tp,
272 post: tpost,
273 ..
274 },
275 Diff::Update {
276 pre: sp,
277 post: spost,
278 ..
279 },
280 ) => {
281 tp.append_all(sp)?;
282 tpost.append_all(spost)
283 }
284 (
285 Diff::Remove {
286 pre: t,
287 ..
288 },
289 Diff::Remove {
290 pre: s,
291 ..
292 },
293 ) => t.append_all(s),
294 _ => unreachable!("merge_into requires matching diff kinds"),
295 }
296}