reifydb_sdk/flow/
builder.rs1use reifydb_core::{
5 common::CommitVersion,
6 interface::{
7 catalog::flow::FlowNodeId,
8 change::{Change, Diff, Diffs},
9 },
10 row::Row,
11 value::column::columns::Columns,
12};
13use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
14
15pub struct ChangeBuilder {
16 operator_id: FlowNodeId,
17 version: CommitVersion,
18 diffs: Diffs,
19 changed_at: DateTime,
20}
21
22impl ChangeBuilder {
23 pub fn new(operator_id: FlowNodeId, version: CommitVersion) -> Self {
24 Self {
25 operator_id,
26 version,
27 diffs: Diffs::new(),
28 changed_at: DateTime::default(),
29 }
30 }
31
32 pub fn changed_at(mut self, changed_at: DateTime) -> Self {
33 self.changed_at = changed_at;
34 self
35 }
36
37 pub fn insert(mut self, post: Columns) -> Self {
38 self.diffs.push(Diff::insert(post));
39 self
40 }
41
42 pub fn insert_row(mut self, row: Row) -> Self {
43 self.diffs.push(Diff::insert(Columns::from_row(&row)));
44 self
45 }
46
47 pub fn update(mut self, pre: Columns, post: Columns) -> Self {
48 self.diffs.push(Diff::update(pre, post));
49 self
50 }
51
52 pub fn update_rows(mut self, pre: Row, post: Row) -> Self {
53 self.diffs.push(Diff::update(Columns::from_row(&pre), Columns::from_row(&post)));
54 self
55 }
56
57 pub fn remove(mut self, pre: Columns) -> Self {
58 self.diffs.push(Diff::remove(pre));
59 self
60 }
61
62 pub fn remove_row(mut self, row: Row) -> Self {
63 self.diffs.push(Diff::remove(Columns::from_row(&row)));
64 self
65 }
66
67 pub fn diff(mut self, diff: Diff) -> Self {
68 self.diffs.push(diff);
69 self
70 }
71
72 pub fn diffs(mut self, iter: impl IntoIterator<Item = Diff>) -> Self {
73 self.diffs.extend(iter);
74 self
75 }
76
77 pub fn build(self) -> Change {
78 let timestamp = self.changed_at;
79 let diffs: Diffs = self
80 .diffs
81 .into_iter()
82 .map(|diff| match diff {
83 Diff::Insert {
84 post,
85 } => Diff::insert(Self::ensure_timestamps((*post).clone(), timestamp)),
86 Diff::Update {
87 pre,
88 post,
89 } => Diff::update(
90 Self::ensure_timestamps((*pre).clone(), timestamp),
91 Self::ensure_timestamps((*post).clone(), timestamp),
92 ),
93 Diff::Remove {
94 pre,
95 } => Diff::remove(Self::ensure_timestamps((*pre).clone(), timestamp)),
96 })
97 .collect();
98 Change::from_flow(self.operator_id, self.version, diffs, self.changed_at)
99 }
100
101 fn ensure_timestamps(columns: Columns, timestamp: DateTime) -> Columns {
102 let row_count = columns.row_count();
103 if row_count > 0 && columns.created_at.is_empty() {
104 Columns {
105 created_at: CowVec::new(vec![timestamp; row_count]),
106 updated_at: CowVec::new(vec![timestamp; row_count]),
107 ..columns
108 }
109 } else {
110 columns
111 }
112 }
113}