reifydb_catalog/materialized/
flow.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	CommitVersion,
6	interface::{FlowDef, FlowId, NamespaceId},
7};
8
9use crate::materialized::{MaterializedCatalog, MultiVersionFlowDef};
10
11impl MaterializedCatalog {
12	/// Find a flow by ID at a specific version
13	pub fn find_flow(&self, flow: FlowId, version: CommitVersion) -> Option<FlowDef> {
14		self.flows.get(&flow).and_then(|entry| {
15			let multi = entry.value();
16			multi.get(version)
17		})
18	}
19
20	/// Find a flow by name in a namespace at a specific version
21	pub fn find_flow_by_name(&self, namespace: NamespaceId, name: &str, version: CommitVersion) -> Option<FlowDef> {
22		self.flows_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
23			let flow_id = *entry.value();
24			self.find_flow(flow_id, version)
25		})
26	}
27
28	pub fn set_flow(&self, id: FlowId, version: CommitVersion, flow: Option<FlowDef>) {
29		// Look up the current flow to update the index
30		if let Some(entry) = self.flows.get(&id) {
31			if let Some(pre) = entry.value().get_latest() {
32				self.flows_by_name.remove(&(pre.namespace, pre.name.clone()));
33			}
34		}
35
36		let multi = self.flows.get_or_insert_with(id, MultiVersionFlowDef::new);
37		if let Some(new) = flow {
38			self.flows_by_name.insert((new.namespace, new.name.clone()), id);
39			multi.value().insert(version, new);
40		} else {
41			multi.value().remove(version);
42		}
43	}
44}
45
46#[cfg(test)]
47mod tests {
48	use reifydb_core::interface::FlowStatus;
49
50	use super::*;
51
52	fn create_test_flow(id: FlowId, namespace: NamespaceId, name: &str) -> FlowDef {
53		FlowDef {
54			id,
55			namespace,
56			name: name.to_string(),
57			status: FlowStatus::Active,
58		}
59	}
60
61	#[test]
62	fn test_set_and_find_flow() {
63		let catalog = MaterializedCatalog::new();
64		let flow_id = FlowId(1);
65		let namespace_id = NamespaceId(1);
66		let flow = create_test_flow(flow_id, namespace_id, "test_flow");
67
68		// Set flow at version 1
69		catalog.set_flow(flow_id, CommitVersion(1), Some(flow.clone()));
70
71		// Find flow at version 1
72		let found = catalog.find_flow(flow_id, CommitVersion(1));
73		assert_eq!(found, Some(flow.clone()));
74
75		// Find flow at later version (should return same flow)
76		let found = catalog.find_flow(flow_id, CommitVersion(5));
77		assert_eq!(found, Some(flow));
78
79		// Flow shouldn't exist at version 0
80		let found = catalog.find_flow(flow_id, CommitVersion(0));
81		assert_eq!(found, None);
82	}
83
84	#[test]
85	fn test_find_flow_by_name() {
86		let catalog = MaterializedCatalog::new();
87		let flow_id = FlowId(1);
88		let namespace_id = NamespaceId(1);
89		let flow = create_test_flow(flow_id, namespace_id, "named_flow");
90
91		// Set flow
92		catalog.set_flow(flow_id, CommitVersion(1), Some(flow.clone()));
93
94		// Find by name
95		let found = catalog.find_flow_by_name(namespace_id, "named_flow", CommitVersion(1));
96		assert_eq!(found, Some(flow));
97
98		// Shouldn't find with wrong name
99		let found = catalog.find_flow_by_name(namespace_id, "wrong_name", CommitVersion(1));
100		assert_eq!(found, None);
101
102		// Shouldn't find in wrong namespace
103		let found = catalog.find_flow_by_name(NamespaceId(2), "named_flow", CommitVersion(1));
104		assert_eq!(found, None);
105	}
106
107	#[test]
108	fn test_flow_rename() {
109		let catalog = MaterializedCatalog::new();
110		let flow_id = FlowId(1);
111		let namespace_id = NamespaceId(1);
112
113		// Create and set initial flow
114		let flow_v1 = create_test_flow(flow_id, namespace_id, "old_name");
115		catalog.set_flow(flow_id, CommitVersion(1), Some(flow_v1.clone()));
116
117		// Verify initial state
118		assert!(catalog.find_flow_by_name(namespace_id, "old_name", CommitVersion(1)).is_some());
119		assert!(catalog.find_flow_by_name(namespace_id, "new_name", CommitVersion(1)).is_none());
120
121		// Rename the flow
122		let mut flow_v2 = flow_v1.clone();
123		flow_v2.name = "new_name".to_string();
124		catalog.set_flow(flow_id, CommitVersion(2), Some(flow_v2.clone()));
125
126		// Old name should be gone
127		assert!(catalog.find_flow_by_name(namespace_id, "old_name", CommitVersion(2)).is_none());
128
129		// New name can be found
130		assert_eq!(
131			catalog.find_flow_by_name(namespace_id, "new_name", CommitVersion(2)),
132			Some(flow_v2.clone())
133		);
134
135		// Historical query at version 1 should still show old name
136		assert_eq!(catalog.find_flow(flow_id, CommitVersion(1)), Some(flow_v1));
137
138		// Current version should show new name
139		assert_eq!(catalog.find_flow(flow_id, CommitVersion(2)), Some(flow_v2));
140	}
141
142	#[test]
143	fn test_flow_move_between_namespaces() {
144		let catalog = MaterializedCatalog::new();
145		let flow_id = FlowId(1);
146		let namespace1 = NamespaceId(1);
147		let namespace2 = NamespaceId(2);
148
149		// Create flow in namespace1
150		let flow_v1 = create_test_flow(flow_id, namespace1, "movable_flow");
151		catalog.set_flow(flow_id, CommitVersion(1), Some(flow_v1.clone()));
152
153		// Verify it's in namespace1
154		assert!(catalog.find_flow_by_name(namespace1, "movable_flow", CommitVersion(1)).is_some());
155		assert!(catalog.find_flow_by_name(namespace2, "movable_flow", CommitVersion(1)).is_none());
156
157		// Move to namespace2
158		let mut flow_v2 = flow_v1.clone();
159		flow_v2.namespace = namespace2;
160		catalog.set_flow(flow_id, CommitVersion(2), Some(flow_v2.clone()));
161
162		// Should no longer be in namespace1
163		assert!(catalog.find_flow_by_name(namespace1, "movable_flow", CommitVersion(2)).is_none());
164
165		// Should now be in namespace2
166		assert!(catalog.find_flow_by_name(namespace2, "movable_flow", CommitVersion(2)).is_some());
167	}
168
169	#[test]
170	fn test_flow_deletion() {
171		let catalog = MaterializedCatalog::new();
172		let flow_id = FlowId(1);
173		let namespace_id = NamespaceId(1);
174
175		// Create and set flow
176		let flow = create_test_flow(flow_id, namespace_id, "deletable_flow");
177		catalog.set_flow(flow_id, CommitVersion(1), Some(flow.clone()));
178
179		// Verify it exists
180		assert_eq!(catalog.find_flow(flow_id, CommitVersion(1)), Some(flow.clone()));
181		assert!(catalog.find_flow_by_name(namespace_id, "deletable_flow", CommitVersion(1)).is_some());
182
183		// Delete the flow
184		catalog.set_flow(flow_id, CommitVersion(2), None);
185
186		// Should not exist at version 2
187		assert_eq!(catalog.find_flow(flow_id, CommitVersion(2)), None);
188		assert!(catalog.find_flow_by_name(namespace_id, "deletable_flow", CommitVersion(2)).is_none());
189
190		// Should still exist at version 1 (historical)
191		assert_eq!(catalog.find_flow(flow_id, CommitVersion(1)), Some(flow));
192	}
193
194	#[test]
195	fn test_multiple_flows_in_namespace() {
196		let catalog = MaterializedCatalog::new();
197		let namespace_id = NamespaceId(1);
198
199		let flow1 = create_test_flow(FlowId(1), namespace_id, "flow1");
200		let flow2 = create_test_flow(FlowId(2), namespace_id, "flow2");
201		let flow3 = create_test_flow(FlowId(3), namespace_id, "flow3");
202
203		// Set multiple flows
204		catalog.set_flow(FlowId(1), CommitVersion(1), Some(flow1.clone()));
205		catalog.set_flow(FlowId(2), CommitVersion(1), Some(flow2.clone()));
206		catalog.set_flow(FlowId(3), CommitVersion(1), Some(flow3.clone()));
207
208		// All should be findable
209		assert_eq!(catalog.find_flow_by_name(namespace_id, "flow1", CommitVersion(1)), Some(flow1));
210		assert_eq!(catalog.find_flow_by_name(namespace_id, "flow2", CommitVersion(1)), Some(flow2));
211		assert_eq!(catalog.find_flow_by_name(namespace_id, "flow3", CommitVersion(1)), Some(flow3));
212	}
213
214	#[test]
215	fn test_flow_versioning() {
216		let catalog = MaterializedCatalog::new();
217		let flow_id = FlowId(1);
218		let namespace_id = NamespaceId(1);
219
220		// Create multiple versions
221		let flow_v1 = create_test_flow(flow_id, namespace_id, "flow_v1");
222		let mut flow_v2 = flow_v1.clone();
223		flow_v2.name = "flow_v2".to_string();
224		let mut flow_v3 = flow_v2.clone();
225		flow_v3.name = "flow_v3".to_string();
226
227		// Set at different versions
228		catalog.set_flow(flow_id, CommitVersion(10), Some(flow_v1.clone()));
229		catalog.set_flow(flow_id, CommitVersion(20), Some(flow_v2.clone()));
230		catalog.set_flow(flow_id, CommitVersion(30), Some(flow_v3.clone()));
231
232		// Query at different versions
233		assert_eq!(catalog.find_flow(flow_id, CommitVersion(5)), None);
234		assert_eq!(catalog.find_flow(flow_id, CommitVersion(10)), Some(flow_v1.clone()));
235		assert_eq!(catalog.find_flow(flow_id, CommitVersion(15)), Some(flow_v1));
236		assert_eq!(catalog.find_flow(flow_id, CommitVersion(20)), Some(flow_v2.clone()));
237		assert_eq!(catalog.find_flow(flow_id, CommitVersion(25)), Some(flow_v2));
238		assert_eq!(catalog.find_flow(flow_id, CommitVersion(30)), Some(flow_v3.clone()));
239		assert_eq!(catalog.find_flow(flow_id, CommitVersion(100)), Some(flow_v3));
240	}
241}