reifydb_sdk/flow/
builder.rs1use reifydb_core::{
5 common::CommitVersion,
6 interface::{
7 catalog::flow::FlowNodeId,
8 change::{Change, Diff, Diffs},
9 },
10 row::Row,
11 value::column::columns::Columns,
12};
13use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
14
15pub struct ChangeBuilder {
16 operator_id: FlowNodeId,
17 version: CommitVersion,
18 diffs: Diffs,
19 changed_at: DateTime,
20}
21
22impl ChangeBuilder {
23 pub fn new(operator_id: FlowNodeId, version: CommitVersion) -> Self {
24 Self {
25 operator_id,
26 version,
27 diffs: Diffs::new(),
28 changed_at: DateTime::default(),
29 }
30 }
31
32 pub fn changed_at(mut self, changed_at: DateTime) -> Self {
33 self.changed_at = changed_at;
34 self
35 }
36
37 pub fn insert(mut self, post: Columns) -> Self {
38 self.diffs.push(Diff::insert(post));
39 self
40 }
41
42 pub fn insert_row(mut self, row: Row) -> Self {
43 self.diffs.push(Diff::insert(Columns::from_row(&row)));
44 self
45 }
46
47 pub fn update(mut self, pre: Columns, post: Columns) -> Self {
48 self.diffs.push(Diff::update(pre, post));
49 self
50 }
51
52 pub fn update_rows(mut self, pre: Row, post: Row) -> Self {
53 self.diffs.push(Diff::update(Columns::from_row(&pre), Columns::from_row(&post)));
54 self
55 }
56
57 pub fn remove(mut self, pre: Columns) -> Self {
58 self.diffs.push(Diff::remove(pre));
59 self
60 }
61
62 pub fn remove_row(mut self, row: Row) -> Self {
63 self.diffs.push(Diff::remove(Columns::from_row(&row)));
64 self
65 }
66
67 pub fn diff(mut self, diff: Diff) -> Self {
68 self.diffs.push(diff);
69 self
70 }
71
72 pub fn diffs(mut self, iter: impl IntoIterator<Item = Diff>) -> Self {
73 self.diffs.extend(iter);
74 self
75 }
76
77 pub fn build(self) -> Change {
78 let timestamp = self.changed_at;
79 let diffs: Diffs = self
80 .diffs
81 .into_iter()
82 .map(|diff| match diff {
83 Diff::Insert {
84 post,
85 ..
86 } => Diff::insert(Self::ensure_timestamps((*post).clone(), timestamp)),
87 Diff::Update {
88 pre,
89 post,
90 ..
91 } => Diff::update(
92 Self::ensure_timestamps((*pre).clone(), timestamp),
93 Self::ensure_timestamps((*post).clone(), timestamp),
94 ),
95 Diff::Remove {
96 pre,
97 ..
98 } => Diff::remove(Self::ensure_timestamps((*pre).clone(), timestamp)),
99 })
100 .collect();
101 Change::from_flow(self.operator_id, self.version, diffs, self.changed_at)
102 }
103
104 fn ensure_timestamps(columns: Columns, timestamp: DateTime) -> Columns {
105 let row_count = columns.row_count();
106 if row_count > 0 && columns.created_at.is_empty() {
107 Columns {
108 created_at: CowVec::new(vec![timestamp; row_count]),
109 updated_at: CowVec::new(vec![timestamp; row_count]),
110 ..columns
111 }
112 } else {
113 columns
114 }
115 }
116}