Skip to main content

reifydb_sdk/flow/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	interface::{
7		catalog::flow::FlowNodeId,
8		change::{Change, Diff, Diffs},
9	},
10	row::Row,
11	value::column::columns::Columns,
12};
13use reifydb_type::{util::cowvec::CowVec, value::datetime::DateTime};
14
15pub struct ChangeBuilder {
16	operator_id: FlowNodeId,
17	version: CommitVersion,
18	diffs: Diffs,
19	changed_at: DateTime,
20}
21
22impl ChangeBuilder {
23	pub fn new(operator_id: FlowNodeId, version: CommitVersion) -> Self {
24		Self {
25			operator_id,
26			version,
27			diffs: Diffs::new(),
28			changed_at: DateTime::default(),
29		}
30	}
31
32	pub fn changed_at(mut self, changed_at: DateTime) -> Self {
33		self.changed_at = changed_at;
34		self
35	}
36
37	pub fn insert(mut self, post: Columns) -> Self {
38		self.diffs.push(Diff::insert(post));
39		self
40	}
41
42	pub fn insert_row(mut self, row: Row) -> Self {
43		self.diffs.push(Diff::insert(Columns::from_row(&row)));
44		self
45	}
46
47	pub fn update(mut self, pre: Columns, post: Columns) -> Self {
48		self.diffs.push(Diff::update(pre, post));
49		self
50	}
51
52	pub fn update_rows(mut self, pre: Row, post: Row) -> Self {
53		self.diffs.push(Diff::update(Columns::from_row(&pre), Columns::from_row(&post)));
54		self
55	}
56
57	pub fn remove(mut self, pre: Columns) -> Self {
58		self.diffs.push(Diff::remove(pre));
59		self
60	}
61
62	pub fn remove_row(mut self, row: Row) -> Self {
63		self.diffs.push(Diff::remove(Columns::from_row(&row)));
64		self
65	}
66
67	pub fn diff(mut self, diff: Diff) -> Self {
68		self.diffs.push(diff);
69		self
70	}
71
72	pub fn diffs(mut self, iter: impl IntoIterator<Item = Diff>) -> Self {
73		self.diffs.extend(iter);
74		self
75	}
76
77	pub fn build(self) -> Change {
78		let timestamp = self.changed_at;
79		let diffs: Diffs = self
80			.diffs
81			.into_iter()
82			.map(|diff| match diff {
83				Diff::Insert {
84					post,
85				} => Diff::insert(Self::ensure_timestamps((*post).clone(), timestamp)),
86				Diff::Update {
87					pre,
88					post,
89				} => Diff::update(
90					Self::ensure_timestamps((*pre).clone(), timestamp),
91					Self::ensure_timestamps((*post).clone(), timestamp),
92				),
93				Diff::Remove {
94					pre,
95				} => Diff::remove(Self::ensure_timestamps((*pre).clone(), timestamp)),
96			})
97			.collect();
98		Change::from_flow(self.operator_id, self.version, diffs, self.changed_at)
99	}
100
101	fn ensure_timestamps(columns: Columns, timestamp: DateTime) -> Columns {
102		let row_count = columns.row_count();
103		if row_count > 0 && columns.created_at.is_empty() {
104			Columns {
105				created_at: CowVec::new(vec![timestamp; row_count]),
106				updated_at: CowVec::new(vec![timestamp; row_count]),
107				..columns
108			}
109		} else {
110			columns
111		}
112	}
113}