reifydb_catalog/materialized/
flow.rs1use reifydb_core::{
5 CommitVersion,
6 interface::{FlowDef, FlowId, NamespaceId},
7};
8
9use crate::materialized::{MaterializedCatalog, MultiVersionFlowDef};
10
11impl MaterializedCatalog {
12 pub fn find_flow(&self, flow: FlowId, version: CommitVersion) -> Option<FlowDef> {
14 self.flows.get(&flow).and_then(|entry| {
15 let multi = entry.value();
16 multi.get(version)
17 })
18 }
19
20 pub fn find_flow_by_name(&self, namespace: NamespaceId, name: &str, version: CommitVersion) -> Option<FlowDef> {
22 self.flows_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
23 let flow_id = *entry.value();
24 self.find_flow(flow_id, version)
25 })
26 }
27
28 pub fn set_flow(&self, id: FlowId, version: CommitVersion, flow: Option<FlowDef>) {
29 if let Some(entry) = self.flows.get(&id) {
31 if let Some(pre) = entry.value().get_latest() {
32 self.flows_by_name.remove(&(pre.namespace, pre.name.clone()));
33 }
34 }
35
36 let multi = self.flows.get_or_insert_with(id, MultiVersionFlowDef::new);
37 if let Some(new) = flow {
38 self.flows_by_name.insert((new.namespace, new.name.clone()), id);
39 multi.value().insert(version, new);
40 } else {
41 multi.value().remove(version);
42 }
43 }
44}
45
46#[cfg(test)]
47mod tests {
48 use reifydb_core::interface::FlowStatus;
49
50 use super::*;
51
52 fn create_test_flow(id: FlowId, namespace: NamespaceId, name: &str) -> FlowDef {
53 FlowDef {
54 id,
55 namespace,
56 name: name.to_string(),
57 status: FlowStatus::Active,
58 }
59 }
60
61 #[test]
62 fn test_set_and_find_flow() {
63 let catalog = MaterializedCatalog::new();
64 let flow_id = FlowId(1);
65 let namespace_id = NamespaceId(1);
66 let flow = create_test_flow(flow_id, namespace_id, "test_flow");
67
68 catalog.set_flow(flow_id, CommitVersion(1), Some(flow.clone()));
70
71 let found = catalog.find_flow(flow_id, CommitVersion(1));
73 assert_eq!(found, Some(flow.clone()));
74
75 let found = catalog.find_flow(flow_id, CommitVersion(5));
77 assert_eq!(found, Some(flow));
78
79 let found = catalog.find_flow(flow_id, CommitVersion(0));
81 assert_eq!(found, None);
82 }
83
84 #[test]
85 fn test_find_flow_by_name() {
86 let catalog = MaterializedCatalog::new();
87 let flow_id = FlowId(1);
88 let namespace_id = NamespaceId(1);
89 let flow = create_test_flow(flow_id, namespace_id, "named_flow");
90
91 catalog.set_flow(flow_id, CommitVersion(1), Some(flow.clone()));
93
94 let found = catalog.find_flow_by_name(namespace_id, "named_flow", CommitVersion(1));
96 assert_eq!(found, Some(flow));
97
98 let found = catalog.find_flow_by_name(namespace_id, "wrong_name", CommitVersion(1));
100 assert_eq!(found, None);
101
102 let found = catalog.find_flow_by_name(NamespaceId(2), "named_flow", CommitVersion(1));
104 assert_eq!(found, None);
105 }
106
107 #[test]
108 fn test_flow_rename() {
109 let catalog = MaterializedCatalog::new();
110 let flow_id = FlowId(1);
111 let namespace_id = NamespaceId(1);
112
113 let flow_v1 = create_test_flow(flow_id, namespace_id, "old_name");
115 catalog.set_flow(flow_id, CommitVersion(1), Some(flow_v1.clone()));
116
117 assert!(catalog.find_flow_by_name(namespace_id, "old_name", CommitVersion(1)).is_some());
119 assert!(catalog.find_flow_by_name(namespace_id, "new_name", CommitVersion(1)).is_none());
120
121 let mut flow_v2 = flow_v1.clone();
123 flow_v2.name = "new_name".to_string();
124 catalog.set_flow(flow_id, CommitVersion(2), Some(flow_v2.clone()));
125
126 assert!(catalog.find_flow_by_name(namespace_id, "old_name", CommitVersion(2)).is_none());
128
129 assert_eq!(
131 catalog.find_flow_by_name(namespace_id, "new_name", CommitVersion(2)),
132 Some(flow_v2.clone())
133 );
134
135 assert_eq!(catalog.find_flow(flow_id, CommitVersion(1)), Some(flow_v1));
137
138 assert_eq!(catalog.find_flow(flow_id, CommitVersion(2)), Some(flow_v2));
140 }
141
142 #[test]
143 fn test_flow_move_between_namespaces() {
144 let catalog = MaterializedCatalog::new();
145 let flow_id = FlowId(1);
146 let namespace1 = NamespaceId(1);
147 let namespace2 = NamespaceId(2);
148
149 let flow_v1 = create_test_flow(flow_id, namespace1, "movable_flow");
151 catalog.set_flow(flow_id, CommitVersion(1), Some(flow_v1.clone()));
152
153 assert!(catalog.find_flow_by_name(namespace1, "movable_flow", CommitVersion(1)).is_some());
155 assert!(catalog.find_flow_by_name(namespace2, "movable_flow", CommitVersion(1)).is_none());
156
157 let mut flow_v2 = flow_v1.clone();
159 flow_v2.namespace = namespace2;
160 catalog.set_flow(flow_id, CommitVersion(2), Some(flow_v2.clone()));
161
162 assert!(catalog.find_flow_by_name(namespace1, "movable_flow", CommitVersion(2)).is_none());
164
165 assert!(catalog.find_flow_by_name(namespace2, "movable_flow", CommitVersion(2)).is_some());
167 }
168
169 #[test]
170 fn test_flow_deletion() {
171 let catalog = MaterializedCatalog::new();
172 let flow_id = FlowId(1);
173 let namespace_id = NamespaceId(1);
174
175 let flow = create_test_flow(flow_id, namespace_id, "deletable_flow");
177 catalog.set_flow(flow_id, CommitVersion(1), Some(flow.clone()));
178
179 assert_eq!(catalog.find_flow(flow_id, CommitVersion(1)), Some(flow.clone()));
181 assert!(catalog.find_flow_by_name(namespace_id, "deletable_flow", CommitVersion(1)).is_some());
182
183 catalog.set_flow(flow_id, CommitVersion(2), None);
185
186 assert_eq!(catalog.find_flow(flow_id, CommitVersion(2)), None);
188 assert!(catalog.find_flow_by_name(namespace_id, "deletable_flow", CommitVersion(2)).is_none());
189
190 assert_eq!(catalog.find_flow(flow_id, CommitVersion(1)), Some(flow));
192 }
193
194 #[test]
195 fn test_multiple_flows_in_namespace() {
196 let catalog = MaterializedCatalog::new();
197 let namespace_id = NamespaceId(1);
198
199 let flow1 = create_test_flow(FlowId(1), namespace_id, "flow1");
200 let flow2 = create_test_flow(FlowId(2), namespace_id, "flow2");
201 let flow3 = create_test_flow(FlowId(3), namespace_id, "flow3");
202
203 catalog.set_flow(FlowId(1), CommitVersion(1), Some(flow1.clone()));
205 catalog.set_flow(FlowId(2), CommitVersion(1), Some(flow2.clone()));
206 catalog.set_flow(FlowId(3), CommitVersion(1), Some(flow3.clone()));
207
208 assert_eq!(catalog.find_flow_by_name(namespace_id, "flow1", CommitVersion(1)), Some(flow1));
210 assert_eq!(catalog.find_flow_by_name(namespace_id, "flow2", CommitVersion(1)), Some(flow2));
211 assert_eq!(catalog.find_flow_by_name(namespace_id, "flow3", CommitVersion(1)), Some(flow3));
212 }
213
214 #[test]
215 fn test_flow_versioning() {
216 let catalog = MaterializedCatalog::new();
217 let flow_id = FlowId(1);
218 let namespace_id = NamespaceId(1);
219
220 let flow_v1 = create_test_flow(flow_id, namespace_id, "flow_v1");
222 let mut flow_v2 = flow_v1.clone();
223 flow_v2.name = "flow_v2".to_string();
224 let mut flow_v3 = flow_v2.clone();
225 flow_v3.name = "flow_v3".to_string();
226
227 catalog.set_flow(flow_id, CommitVersion(10), Some(flow_v1.clone()));
229 catalog.set_flow(flow_id, CommitVersion(20), Some(flow_v2.clone()));
230 catalog.set_flow(flow_id, CommitVersion(30), Some(flow_v3.clone()));
231
232 assert_eq!(catalog.find_flow(flow_id, CommitVersion(5)), None);
234 assert_eq!(catalog.find_flow(flow_id, CommitVersion(10)), Some(flow_v1.clone()));
235 assert_eq!(catalog.find_flow(flow_id, CommitVersion(15)), Some(flow_v1));
236 assert_eq!(catalog.find_flow(flow_id, CommitVersion(20)), Some(flow_v2.clone()));
237 assert_eq!(catalog.find_flow(flow_id, CommitVersion(25)), Some(flow_v2));
238 assert_eq!(catalog.find_flow(flow_id, CommitVersion(30)), Some(flow_v3.clone()));
239 assert_eq!(catalog.find_flow(flow_id, CommitVersion(100)), Some(flow_v3));
240 }
241}