Skip to main content

reifydb_sdk/flow/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	interface::{
7		catalog::flow::FlowNodeId,
8		change::{Change, Diff, Diffs},
9	},
10	row::Row,
11	value::column::columns::Columns,
12};
13use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
14
15pub struct ChangeBuilder {
16	operator_id: FlowNodeId,
17	version: CommitVersion,
18	diffs: Diffs,
19	changed_at: DateTime,
20}
21
22impl ChangeBuilder {
23	pub fn new(operator_id: FlowNodeId, version: CommitVersion) -> Self {
24		Self {
25			operator_id,
26			version,
27			diffs: Diffs::new(),
28			changed_at: DateTime::default(),
29		}
30	}
31
32	pub fn changed_at(mut self, changed_at: DateTime) -> Self {
33		self.changed_at = changed_at;
34		self
35	}
36
37	pub fn insert(mut self, post: Columns) -> Self {
38		self.diffs.push(Diff::insert(post));
39		self
40	}
41
42	pub fn insert_row(mut self, row: Row) -> Self {
43		self.diffs.push(Diff::insert(Columns::from_row(&row)));
44		self
45	}
46
47	pub fn update(mut self, pre: Columns, post: Columns) -> Self {
48		self.diffs.push(Diff::update(pre, post));
49		self
50	}
51
52	pub fn update_rows(mut self, pre: Row, post: Row) -> Self {
53		self.diffs.push(Diff::update(Columns::from_row(&pre), Columns::from_row(&post)));
54		self
55	}
56
57	pub fn remove(mut self, pre: Columns) -> Self {
58		self.diffs.push(Diff::remove(pre));
59		self
60	}
61
62	pub fn remove_row(mut self, row: Row) -> Self {
63		self.diffs.push(Diff::remove(Columns::from_row(&row)));
64		self
65	}
66
67	pub fn diff(mut self, diff: Diff) -> Self {
68		self.diffs.push(diff);
69		self
70	}
71
72	pub fn diffs(mut self, iter: impl IntoIterator<Item = Diff>) -> Self {
73		self.diffs.extend(iter);
74		self
75	}
76
77	pub fn build(self) -> Change {
78		let timestamp = self.changed_at;
79		let diffs: Diffs = self
80			.diffs
81			.into_iter()
82			.map(|diff| match diff {
83				Diff::Insert {
84					post,
85					..
86				} => Diff::insert(Self::ensure_timestamps((*post).clone(), timestamp)),
87				Diff::Update {
88					pre,
89					post,
90					..
91				} => Diff::update(
92					Self::ensure_timestamps((*pre).clone(), timestamp),
93					Self::ensure_timestamps((*post).clone(), timestamp),
94				),
95				Diff::Remove {
96					pre,
97					..
98				} => Diff::remove(Self::ensure_timestamps((*pre).clone(), timestamp)),
99			})
100			.collect();
101		Change::from_flow(self.operator_id, self.version, diffs, self.changed_at)
102	}
103
104	fn ensure_timestamps(columns: Columns, timestamp: DateTime) -> Columns {
105		let row_count = columns.row_count();
106		if row_count > 0 && columns.created_at.is_empty() {
107			Columns {
108				created_at: CowVec::new(vec![timestamp; row_count]),
109				updated_at: CowVec::new(vec![timestamp; row_count]),
110				..columns
111			}
112		} else {
113			columns
114		}
115	}
116}