1use crate::prelude::*;
14
15#[cfg(feature = "persistence")]
16use crate::execution::db_tracking_setup;
17#[cfg(feature = "persistence")]
18use crate::ops::get_target_factory;
19#[cfg(feature = "persistence")]
20use crate::ops::interface::SetupStateCompatibility;
21
22pub struct ImportOpExecutionContext {
23 pub source_id: i32,
24}
25
26pub struct ExportOpExecutionContext {
27 pub target_id: i32,
28 pub schema_version_id: usize,
29}
30
31#[cfg(feature = "persistence")]
32pub struct FlowSetupExecutionContext {
33 pub setup_state: setup::FlowSetupState<setup::DesiredMode>,
34 pub import_ops: Vec<ImportOpExecutionContext>,
35 pub export_ops: Vec<ExportOpExecutionContext>,
36}
37
38pub struct AnalyzedTargetSetupState {
39 pub target_kind: String,
40 pub setup_key: serde_json::Value,
41 pub desired_setup_state: serde_json::Value,
42 pub setup_by_user: bool,
43 pub key_type: Option<Box<[schema::ValueType]>>,
45
46 pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
47}
48
49pub struct AnalyzedSetupState {
50 pub targets: Vec<AnalyzedTargetSetupState>,
51 pub declarations: Vec<AnalyzedTargetSetupState>,
52}
53
54#[cfg(feature = "persistence")]
55fn build_import_op_exec_ctx(
56 import_op: &spec::NamedSpec<spec::ImportOpSpec>,
57 import_op_output_type: &schema::EnrichedValueType,
58 existing_source_states: Option<&Vec<&setup::SourceSetupState>>,
59 metadata: &mut setup::FlowSetupMetadata,
60) -> Result<ImportOpExecutionContext> {
61 let keys_schema_no_attrs = import_op_output_type
62 .typ
63 .key_schema()
64 .iter()
65 .map(|field| field.value_type.typ.without_attrs())
66 .collect::<Box<[_]>>();
67
68 let existing_source_ids = existing_source_states
69 .iter()
70 .flat_map(|v| v.iter())
71 .filter_map(|state| {
72 let existing_keys_schema: &[schema::ValueType] =
73 if let Some(keys_schema) = &state.keys_schema {
74 keys_schema
75 } else {
76 #[cfg(feature = "legacy-states-v0")]
77 if let Some(key_schema) = &state.key_schema {
78 std::slice::from_ref(key_schema)
79 } else {
80 &[]
81 }
82 #[cfg(not(feature = "legacy-states-v0"))]
83 &[]
84 };
85 if existing_keys_schema == keys_schema_no_attrs.as_ref() {
86 Some(state.source_id)
87 } else {
88 None
89 }
90 })
91 .collect::<HashSet<_>>();
92 let source_id = if existing_source_ids.len() == 1 {
93 existing_source_ids.into_iter().next().unwrap()
94 } else {
95 if existing_source_ids.len() > 1 {
96 warn!("Multiple source states with the same key schema found");
97 }
98 metadata.last_source_id += 1;
99 metadata.last_source_id
100 };
101 metadata.sources.insert(
102 import_op.name.clone(),
103 setup::SourceSetupState {
104 source_id,
105
106 #[cfg(feature = "legacy-states-v0")]
109 key_schema: Some(if keys_schema_no_attrs.len() == 1 {
110 keys_schema_no_attrs[0].clone()
111 } else {
112 schema::ValueType::Struct(schema::StructSchema {
113 fields: Arc::new(
114 import_op_output_type
115 .typ
116 .key_schema()
117 .iter()
118 .map(|field| {
119 schema::FieldSchema::new(
120 field.name.clone(),
121 field.value_type.clone(),
122 )
123 })
124 .collect(),
125 ),
126 description: None,
127 })
128 }),
129 keys_schema: Some(keys_schema_no_attrs),
130 source_kind: import_op.spec.source.kind.clone(),
131 },
132 );
133 Ok(ImportOpExecutionContext { source_id })
134}
135
136#[cfg(feature = "persistence")]
137fn build_export_op_exec_ctx(
138 analyzed_target_ss: &AnalyzedTargetSetupState,
139 existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>,
140 metadata: &mut setup::FlowSetupMetadata,
141 target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
142) -> Result<ExportOpExecutionContext> {
143 let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?;
144
145 let resource_id = setup::ResourceIdentifier {
146 key: analyzed_target_ss.setup_key.clone(),
147 target_kind: analyzed_target_ss.target_kind.clone(),
148 };
149 let existing_target_states = existing_target_states.get(&resource_id);
150 let mut compatible_target_ids = HashSet::<Option<i32>>::new();
151 let mut reusable_schema_version_ids = HashSet::<Option<usize>>::new();
152 for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
153 let compatibility = if let Some(key_type) = &analyzed_target_ss.key_type
154 && let Some(existing_key_type) = &existing_state.common.key_type
155 && key_type != existing_key_type
156 {
157 SetupStateCompatibility::NotCompatible
158 } else if analyzed_target_ss.setup_by_user != existing_state.common.setup_by_user {
159 SetupStateCompatibility::NotCompatible
160 } else {
161 target_factory.check_state_compatibility(
162 &analyzed_target_ss.desired_setup_state,
163 &existing_state.state,
164 )?
165 };
166 let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible {
167 reusable_schema_version_ids.insert(
168 (compatibility == SetupStateCompatibility::Compatible)
169 .then_some(existing_state.common.schema_version_id),
170 );
171 Some(existing_state.common.target_id)
172 } else {
173 None
174 };
175 compatible_target_ids.insert(compatible_target_id);
176 }
177
178 let target_id = if compatible_target_ids.len() == 1 {
179 compatible_target_ids.into_iter().next().flatten()
180 } else {
181 if compatible_target_ids.len() > 1 {
182 warn!("Multiple target states with the same key schema found");
183 }
184 None
185 };
186 let target_id = target_id.unwrap_or_else(|| {
187 metadata.last_target_id += 1;
188 metadata.last_target_id
189 });
190 let max_schema_version_id = existing_target_states
191 .iter()
192 .flat_map(|v| v.iter())
193 .map(|s| s.common.max_schema_version_id)
194 .max()
195 .unwrap_or(0);
196 let schema_version_id = if reusable_schema_version_ids.len() == 1 {
197 reusable_schema_version_ids
198 .into_iter()
199 .next()
200 .unwrap()
201 .unwrap_or(max_schema_version_id + 1)
202 } else {
203 max_schema_version_id + 1
204 };
205
206 match target_states.entry(resource_id) {
207 indexmap::map::Entry::Occupied(entry) => {
208 api_bail!(
209 "Target resource already exists: kind = {}, key = {}",
210 entry.key().target_kind,
211 entry.key().key
212 );
213 }
214 indexmap::map::Entry::Vacant(entry) => {
215 entry.insert(setup::TargetSetupState {
216 common: setup::TargetSetupStateCommon {
217 target_id,
218 schema_version_id,
219 max_schema_version_id: max_schema_version_id.max(schema_version_id),
220 setup_by_user: analyzed_target_ss.setup_by_user,
221 key_type: analyzed_target_ss.key_type.clone(),
222 },
223 state: analyzed_target_ss.desired_setup_state.clone(),
224 attachments: analyzed_target_ss.attachments.clone(),
225 });
226 }
227 }
228 Ok(ExportOpExecutionContext {
229 target_id,
230 schema_version_id,
231 })
232}
233
234#[cfg(feature = "persistence")]
235pub fn build_flow_setup_execution_context(
236 flow_inst: &spec::FlowInstanceSpec,
237 data_schema: &schema::FlowSchema,
238 analyzed_ss: &AnalyzedSetupState,
239 existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
240) -> Result<FlowSetupExecutionContext> {
241 let existing_metadata_versions = || {
242 existing_flow_ss
243 .iter()
244 .flat_map(|flow_ss| flow_ss.metadata.possible_versions())
245 };
246
247 let mut source_states_by_name = HashMap::<&str, Vec<&setup::SourceSetupState>>::new();
248 for metadata_version in existing_metadata_versions() {
249 for (source_name, state) in metadata_version.sources.iter() {
250 source_states_by_name
251 .entry(source_name.as_str())
252 .or_default()
253 .push(state);
254 }
255 }
256
257 let mut target_states_by_name_type =
258 HashMap::<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>::new();
259 for metadata_version in existing_flow_ss.iter() {
260 for (resource_id, target) in metadata_version.targets.iter() {
261 target_states_by_name_type
262 .entry(resource_id)
263 .or_default()
264 .extend(target.possible_versions());
265 }
266 }
267
268 let mut metadata = setup::FlowSetupMetadata {
269 last_source_id: existing_metadata_versions()
270 .map(|metadata| metadata.last_source_id)
271 .max()
272 .unwrap_or(0),
273 last_target_id: existing_metadata_versions()
274 .map(|metadata| metadata.last_target_id)
275 .max()
276 .unwrap_or(0),
277 sources: BTreeMap::new(),
278 features: existing_flow_ss
279 .map(|m| {
280 m.metadata
281 .possible_versions()
282 .flat_map(|v| v.features.iter())
283 .cloned()
284 .collect::<BTreeSet<_>>()
285 })
286 .unwrap_or_else(setup::flow_features::default_features),
287 };
288 let mut target_states = IndexMap::new();
289
290 let import_op_exec_ctx = flow_inst
291 .import_ops
292 .iter()
293 .map(|import_op| {
294 let output_type = data_schema
295 .root_op_scope
296 .op_output_types
297 .get(&import_op.name)
298 .ok_or_else(invariance_violation)?;
299 build_import_op_exec_ctx(
300 import_op,
301 output_type,
302 source_states_by_name.get(&import_op.name.as_str()),
303 &mut metadata,
304 )
305 })
306 .collect::<Result<Vec<_>>>()?;
307
308 let export_op_exec_ctx = analyzed_ss
309 .targets
310 .iter()
311 .map(|analyzed_target_ss| {
312 build_export_op_exec_ctx(
313 analyzed_target_ss,
314 &target_states_by_name_type,
315 &mut metadata,
316 &mut target_states,
317 )
318 })
319 .collect::<Result<Vec<_>>>()?;
320
321 for analyzed_target_ss in analyzed_ss.declarations.iter() {
322 build_export_op_exec_ctx(
323 analyzed_target_ss,
324 &target_states_by_name_type,
325 &mut metadata,
326 &mut target_states,
327 )?;
328 }
329
330 let setup_state = setup::FlowSetupState::<setup::DesiredMode> {
331 seen_flow_metadata_version: existing_flow_ss
332 .and_then(|flow_ss| flow_ss.seen_flow_metadata_version),
333 tracking_table: db_tracking_setup::TrackingTableSetupState {
334 table_name: existing_flow_ss
335 .and_then(|flow_ss| {
336 flow_ss
337 .tracking_table
338 .current
339 .as_ref()
340 .map(|v| v.table_name.clone())
341 })
342 .unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)),
343 version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION,
344 source_state_table_name: metadata
345 .features
346 .contains(setup::flow_features::SOURCE_STATE_TABLE)
347 .then(|| {
348 existing_flow_ss
349 .and_then(|flow_ss| flow_ss.tracking_table.current.as_ref())
350 .and_then(|v| v.source_state_table_name.clone())
351 .unwrap_or_else(|| {
352 db_tracking_setup::default_source_state_table_name(&flow_inst.name)
353 })
354 }),
355 has_fast_fingerprint_column: metadata
356 .features
357 .contains(setup::flow_features::FAST_FINGERPRINT),
358 },
359 targets: target_states,
360 metadata,
361 };
362 Ok(FlowSetupExecutionContext {
363 setup_state,
364 import_ops: import_op_exec_ctx,
365 export_ops: export_op_exec_ctx,
366 })
367}