1use crate::prelude::*;
14use crate::setup::ResourceSetupChange;
15use std::fmt::Debug;
16use std::hash::Hash;
17
18use super::interface::*;
19use super::registry::*;
20use crate::base::schema::*;
21use crate::base::spec::*;
22use crate::builder::plan::AnalyzedValueMapping;
23use crate::setup;
24
25pub struct OpArgResolver<'arg> {
30 name: String,
31 resolved_op_arg: Option<(usize, EnrichedValueType)>,
32 nonnull_args_idx: &'arg mut Vec<usize>,
33 may_nullify_output: &'arg mut bool,
34}
35
36impl<'arg> OpArgResolver<'arg> {
37 pub fn expect_nullable_type(self, expected_type: &ValueType) -> Result<Self> {
38 let Some((_, typ)) = &self.resolved_op_arg else {
39 return Ok(self);
40 };
41 if &typ.typ != expected_type {
42 api_bail!(
43 "Expected argument `{}` to be of type `{}`, got `{}`",
44 self.name,
45 expected_type,
46 typ.typ
47 );
48 }
49 Ok(self)
50 }
51 pub fn expect_type(self, expected_type: &ValueType) -> Result<Self> {
52 let resolver = self.expect_nullable_type(expected_type)?;
53 if let Some((idx, typ)) = resolver.resolved_op_arg.as_ref() {
54 resolver.nonnull_args_idx.push(*idx);
55 if typ.nullable {
56 *resolver.may_nullify_output = true;
57 }
58 }
59 Ok(resolver)
60 }
61
62 pub fn optional(self) -> Option<ResolvedOpArg> {
63 self.resolved_op_arg.map(|(idx, typ)| ResolvedOpArg {
64 name: self.name,
65 typ,
66 idx,
67 })
68 }
69
70 pub fn required(self) -> Result<ResolvedOpArg> {
71 let Some((idx, typ)) = self.resolved_op_arg else {
72 api_bail!("Required argument `{}` is missing", self.name);
73 };
74 Ok(ResolvedOpArg {
75 name: self.name,
76 typ,
77 idx,
78 })
79 }
80}
81
82pub struct ResolvedOpArg {
83 pub name: String,
84 pub typ: EnrichedValueType,
85 pub idx: usize,
86}
87
88pub trait ResolvedOpArgExt: Sized {
89 fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value>;
90 #[allow(dead_code)]
91 fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value>;
92}
93
94impl ResolvedOpArgExt for ResolvedOpArg {
95 fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value> {
96 if self.idx >= args.len() {
97 api_bail!(
98 "Two few arguments, {} provided, expected at least {} for `{}`",
99 args.len(),
100 self.idx + 1,
101 self.name
102 );
103 }
104 Ok(&args[self.idx])
105 }
106
107 fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value> {
108 if self.idx >= args.len() {
109 api_bail!(
110 "Two few arguments, {} provided, expected at least {} for `{}`",
111 args.len(),
112 self.idx + 1,
113 self.name
114 );
115 }
116 Ok(std::mem::take(&mut args[self.idx]))
117 }
118}
119
120impl ResolvedOpArgExt for Option<ResolvedOpArg> {
121 fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value> {
122 Ok(self
123 .as_ref()
124 .map(|arg| arg.value(args))
125 .transpose()?
126 .unwrap_or(&value::Value::Null))
127 }
128
129 fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value> {
130 Ok(self
131 .as_ref()
132 .map(|arg| arg.take_value(args))
133 .transpose()?
134 .unwrap_or(value::Value::Null))
135 }
136}
137
138pub struct OpArgsResolver<'a> {
139 args: &'a [OpArgSchema],
140 num_positional_args: usize,
141 next_positional_idx: usize,
142 remaining_kwargs: HashMap<&'a str, usize>,
143 nonnull_args_idx: &'a mut Vec<usize>,
144 may_nullify_output: &'a mut bool,
145}
146
147impl<'a> OpArgsResolver<'a> {
148 pub fn new(
149 args: &'a [OpArgSchema],
150 nonnull_args_idx: &'a mut Vec<usize>,
151 may_nullify_output: &'a mut bool,
152 ) -> Result<Self> {
153 let mut num_positional_args = 0;
154 let mut kwargs = HashMap::new();
155 for (idx, arg) in args.iter().enumerate() {
156 if let Some(name) = &arg.name.0 {
157 kwargs.insert(name.as_str(), idx);
158 } else {
159 if !kwargs.is_empty() {
160 api_bail!("Positional arguments must be provided before keyword arguments");
161 }
162 num_positional_args += 1;
163 }
164 }
165 Ok(Self {
166 args,
167 num_positional_args,
168 next_positional_idx: 0,
169 remaining_kwargs: kwargs,
170 nonnull_args_idx,
171 may_nullify_output,
172 })
173 }
174
175 pub fn next_arg<'arg>(&'arg mut self, name: &str) -> Result<OpArgResolver<'arg>> {
176 let idx = if let Some(idx) = self.remaining_kwargs.remove(name) {
177 if self.next_positional_idx < self.num_positional_args {
178 api_bail!("`{name}` is provided as both positional and keyword arguments");
179 } else {
180 Some(idx)
181 }
182 } else if self.next_positional_idx < self.num_positional_args {
183 let idx = self.next_positional_idx;
184 self.next_positional_idx += 1;
185 Some(idx)
186 } else {
187 None
188 };
189 Ok(OpArgResolver {
190 name: name.to_string(),
191 resolved_op_arg: idx.map(|idx| (idx, self.args[idx].value_type.clone())),
192 nonnull_args_idx: self.nonnull_args_idx,
193 may_nullify_output: self.may_nullify_output,
194 })
195 }
196
197 pub fn done(self) -> Result<()> {
198 if self.next_positional_idx < self.num_positional_args {
199 api_bail!(
200 "Expected {} positional arguments, got {}",
201 self.next_positional_idx,
202 self.num_positional_args
203 );
204 }
205 if !self.remaining_kwargs.is_empty() {
206 api_bail!(
207 "Unexpected keyword arguments: {}",
208 self.remaining_kwargs
209 .keys()
210 .map(|k| format!("`{k}`"))
211 .collect::<Vec<_>>()
212 .join(", ")
213 )
214 }
215 Ok(())
216 }
217
218 pub fn get_analyze_value(&self, resolved_arg: &ResolvedOpArg) -> &AnalyzedValueMapping {
219 &self.args[resolved_arg.idx].analyzed_value
220 }
221}
222
223#[async_trait]
228pub trait SourceFactoryBase: SourceFactory + Send + Sync + 'static {
229 type Spec: DeserializeOwned + Send + Sync;
230
231 fn name(&self) -> &str;
232
233 async fn get_output_schema(
234 &self,
235 spec: &Self::Spec,
236 context: &FlowInstanceContext,
237 ) -> Result<EnrichedValueType>;
238
239 async fn build_executor(
240 self: Arc<Self>,
241 source_name: &str,
242 spec: Self::Spec,
243 context: Arc<FlowInstanceContext>,
244 ) -> Result<Box<dyn SourceExecutor>>;
245
246 fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
247 where
248 Self: Sized,
249 {
250 registry.register(
251 self.name().to_string(),
252 ExecutorFactory::Source(Arc::new(self)),
253 )
254 }
255}
256
257#[async_trait]
258impl<T: SourceFactoryBase> SourceFactory for T {
259 async fn build(
260 self: Arc<Self>,
261 source_name: &str,
262 spec: serde_json::Value,
263 context: Arc<FlowInstanceContext>,
264 ) -> Result<(
265 EnrichedValueType,
266 BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
267 )> {
268 let spec: T::Spec = utils::deser::from_json_value(spec)
269 .map_err(Error::from)
270 .with_context(|| format!("Failed in parsing spec for source `{source_name}`"))?;
271 let output_schema = self.get_output_schema(&spec, &context).await?;
272 let source_name = source_name.to_string();
273 let executor = async move { self.build_executor(&source_name, spec, context).await };
274 Ok((output_schema, Box::pin(executor)))
275 }
276}
277
278pub struct SimpleFunctionAnalysisOutput<T: Send + Sync> {
283 pub resolved_args: T,
284 pub output_schema: EnrichedValueType,
285 pub behavior_version: Option<u32>,
286}
287
288#[async_trait]
289pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'static {
290 type Spec: DeserializeOwned + Send + Sync;
291 type ResolvedArgs: Send + Sync;
292
293 fn name(&self) -> &str;
294
295 async fn analyze<'a>(
296 &'a self,
297 spec: &'a Self::Spec,
298 args_resolver: &mut OpArgsResolver<'a>,
299 context: &FlowInstanceContext,
300 ) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>>;
301
302 async fn build_executor(
303 self: Arc<Self>,
304 spec: Self::Spec,
305 resolved_args: Self::ResolvedArgs,
306 context: Arc<FlowInstanceContext>,
307 ) -> Result<impl SimpleFunctionExecutor>;
308
309 fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
310 where
311 Self: Sized,
312 {
313 registry.register(
314 self.name().to_string(),
315 ExecutorFactory::SimpleFunction(Arc::new(self)),
316 )
317 }
318}
319
320struct FunctionExecutorWrapper<E: SimpleFunctionExecutor> {
321 executor: E,
322 nonnull_args_idx: Vec<usize>,
323}
324
325#[async_trait]
326impl<E: SimpleFunctionExecutor> SimpleFunctionExecutor for FunctionExecutorWrapper<E> {
327 async fn evaluate(&self, args: Vec<value::Value>) -> Result<value::Value> {
328 for idx in &self.nonnull_args_idx {
329 if args[*idx].is_null() {
330 return Ok(value::Value::Null);
331 }
332 }
333 self.executor.evaluate(args).await
334 }
335
336 fn enable_cache(&self) -> bool {
337 self.executor.enable_cache()
338 }
339}
340
341#[async_trait]
342impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
343 async fn build(
344 self: Arc<Self>,
345 spec: serde_json::Value,
346 input_schema: Vec<OpArgSchema>,
347 context: Arc<FlowInstanceContext>,
348 ) -> Result<SimpleFunctionBuildOutput> {
349 let spec: T::Spec = utils::deser::from_json_value(spec)
350 .map_err(Error::from)
351 .with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?;
352 let mut nonnull_args_idx = vec![];
353 let mut may_nullify_output = false;
354 let mut args_resolver = OpArgsResolver::new(
355 &input_schema,
356 &mut nonnull_args_idx,
357 &mut may_nullify_output,
358 )?;
359 let SimpleFunctionAnalysisOutput {
360 resolved_args,
361 mut output_schema,
362 behavior_version,
363 } = self.analyze(&spec, &mut args_resolver, &context).await?;
364 args_resolver.done()?;
365
366 if may_nullify_output {
368 output_schema.nullable = true;
369 }
370
371 let executor = async move {
372 Ok(Box::new(FunctionExecutorWrapper {
373 executor: self.build_executor(spec, resolved_args, context).await?,
374 nonnull_args_idx,
375 }) as Box<dyn SimpleFunctionExecutor>)
376 };
377 Ok(SimpleFunctionBuildOutput {
378 output_type: output_schema,
379 behavior_version,
380 executor: Box::pin(executor),
381 })
382 }
383}
384
385#[async_trait]
386pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static {
387 async fn evaluate_batch(&self, args: Vec<Vec<value::Value>>) -> Result<Vec<value::Value>>;
388
389 fn enable_cache(&self) -> bool {
390 false
391 }
392
393 fn timeout(&self) -> Option<std::time::Duration> {
394 None
395 }
396
397 fn into_fn_executor(self) -> impl SimpleFunctionExecutor {
398 BatchedFunctionExecutorWrapper::new(self)
399 }
400
401 #[cfg(feature = "batching")]
402 fn batching_options(&self) -> batching::BatchingOptions;
403}
404
405#[cfg(feature = "batching")]
406struct BatchedFunctionExecutorRunner<E: BatchedFunctionExecutor>(E);
407
408#[cfg(feature = "batching")]
409#[async_trait]
410impl<E: BatchedFunctionExecutor> batching::Runner for BatchedFunctionExecutorRunner<E> {
411 type Input = Vec<value::Value>;
412 type Output = value::Value;
413
414 async fn run(
415 &self,
416 inputs: Vec<Self::Input>,
417 ) -> Result<impl ExactSizeIterator<Item = Self::Output>> {
418 Ok(self.0.evaluate_batch(inputs).await?.into_iter())
419 }
420}
421
422struct BatchedFunctionExecutorWrapper<E: BatchedFunctionExecutor> {
423 #[cfg(feature = "batching")]
424 batcher: batching::Batcher<BatchedFunctionExecutorRunner<E>>,
425 #[cfg(not(feature = "batching"))]
426 executor: E,
427 enable_cache: bool,
428 timeout: Option<std::time::Duration>,
429}
430
431impl<E: BatchedFunctionExecutor> BatchedFunctionExecutorWrapper<E> {
432 fn new(executor: E) -> Self {
433 let enable_cache = executor.enable_cache();
434 let timeout = executor.timeout();
435 #[cfg(feature = "batching")]
436 {
437 let batching_options = executor.batching_options();
438 Self {
439 enable_cache,
440 timeout,
441 batcher: batching::Batcher::new(
442 BatchedFunctionExecutorRunner(executor),
443 batching_options,
444 ),
445 }
446 }
447 #[cfg(not(feature = "batching"))]
448 {
449 Self {
450 enable_cache,
451 timeout,
452 executor,
453 }
454 }
455 }
456}
457
458#[async_trait]
459impl<E: BatchedFunctionExecutor> SimpleFunctionExecutor for BatchedFunctionExecutorWrapper<E> {
460 async fn evaluate(&self, args: Vec<value::Value>) -> Result<value::Value> {
461 #[cfg(feature = "batching")]
462 {
463 self.batcher.run(args).await
464 }
465 #[cfg(not(feature = "batching"))]
466 {
467 let results = self.executor.evaluate_batch(vec![args]).await?;
468 results
469 .into_iter()
470 .next()
471 .ok_or_else(|| internal_error!("Expected exactly one result from evaluate_batch"))
472 }
473 }
474
475 fn enable_cache(&self) -> bool {
476 self.enable_cache
477 }
478 fn timeout(&self) -> Option<std::time::Duration> {
479 self.timeout
480 }
481}
482
483pub struct TypedExportDataCollectionBuildOutput<F: TargetFactoryBase + ?Sized> {
488 pub export_context: BoxFuture<'static, Result<Arc<F::ExportContext>>>,
489 pub setup_key: F::SetupKey,
490 pub desired_setup_state: F::SetupState,
491}
492pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
493 pub name: String,
494 pub spec: F::Spec,
495 pub key_fields_schema: Box<[FieldSchema]>,
496 pub value_fields_schema: Vec<FieldSchema>,
497 pub index_options: IndexOptions,
498}
499
500pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> {
501 pub key: F::SetupKey,
502 pub setup_change: &'a F::SetupChange,
503}
504
505#[async_trait]
506pub trait TargetFactoryBase: Send + Sync + 'static {
507 type Spec: DeserializeOwned + Send + Sync;
508 type DeclarationSpec: DeserializeOwned + Send + Sync;
509
510 type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
511 type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
512 type SetupChange: ResourceSetupChange;
513
514 type ExportContext: Send + Sync + 'static;
515
516 fn name(&self) -> &str;
517
518 async fn build(
519 self: Arc<Self>,
520 data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
521 declarations: Vec<Self::DeclarationSpec>,
522 context: Arc<FlowInstanceContext>,
523 ) -> Result<(
524 Vec<TypedExportDataCollectionBuildOutput<Self>>,
525 Vec<(Self::SetupKey, Self::SetupState)>,
526 )>;
527
528 fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
531 Ok(utils::deser::from_json_value(key)?)
532 }
533
534 async fn diff_setup_states(
537 &self,
538 key: Self::SetupKey,
539 desired_state: Option<Self::SetupState>,
540 existing_states: setup::CombinedState<Self::SetupState>,
541 flow_instance_ctx: Arc<FlowInstanceContext>,
542 ) -> Result<Self::SetupChange>;
543
544 fn check_state_compatibility(
545 &self,
546 desired_state: &Self::SetupState,
547 existing_state: &Self::SetupState,
548 ) -> Result<SetupStateCompatibility>;
549
550 fn describe_resource(&self, key: &Self::SetupKey) -> Result<String>;
551
552 fn extract_additional_key(
553 &self,
554 _key: &value::KeyValue,
555 _value: &value::FieldValues,
556 _export_context: &Self::ExportContext,
557 ) -> Result<serde_json::Value> {
558 Ok(serde_json::Value::Null)
559 }
560
561 fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
562 where
563 Self: Sized,
564 {
565 registry.register(
566 self.name().to_string(),
567 ExecutorFactory::ExportTarget(Arc::new(self)),
568 )
569 }
570
571 async fn apply_mutation(
572 &self,
573 mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
574 ) -> Result<()>;
575
576 async fn apply_setup_changes(
577 &self,
578 setup_change: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
579 context: Arc<FlowInstanceContext>,
580 ) -> Result<()>;
581}
582
583#[async_trait]
584impl<T: TargetFactoryBase> TargetFactory for T {
585 async fn build(
586 self: Arc<Self>,
587 data_collections: Vec<interface::ExportDataCollectionSpec>,
588 declarations: Vec<serde_json::Value>,
589 context: Arc<FlowInstanceContext>,
590 ) -> Result<(
591 Vec<interface::ExportDataCollectionBuildOutput>,
592 Vec<(serde_json::Value, serde_json::Value)>,
593 )> {
594 let (data_coll_output, decl_output) = TargetFactoryBase::build(
595 self,
596 data_collections
597 .into_iter()
598 .map(|d| -> Result<_> {
599 Ok(TypedExportDataCollectionSpec {
600 spec: utils::deser::from_json_value(d.spec)
601 .map_err(Error::from)
602 .with_context(|| {
603 format!("Failed in parsing spec for target `{}`", d.name)
604 })?,
605 name: d.name,
606 key_fields_schema: d.key_fields_schema,
607 value_fields_schema: d.value_fields_schema,
608 index_options: d.index_options,
609 })
610 })
611 .collect::<Result<Vec<_>>>()?,
612 declarations
613 .into_iter()
614 .map(|d| -> Result<_> { Ok(utils::deser::from_json_value(d)?) })
615 .collect::<Result<Vec<_>>>()?,
616 context,
617 )
618 .await?;
619
620 let data_coll_output = data_coll_output
621 .into_iter()
622 .map(|d| {
623 Ok(interface::ExportDataCollectionBuildOutput {
624 export_context: async move {
625 Ok(d.export_context.await? as Arc<dyn Any + Send + Sync>)
626 }
627 .boxed(),
628 setup_key: serde_json::to_value(d.setup_key)?,
629 desired_setup_state: serde_json::to_value(d.desired_setup_state)?,
630 })
631 })
632 .collect::<Result<Vec<_>>>()?;
633 let decl_output = decl_output
634 .into_iter()
635 .map(|(key, state)| Ok((serde_json::to_value(key)?, serde_json::to_value(state)?)))
636 .collect::<Result<Vec<_>>>()?;
637 Ok((data_coll_output, decl_output))
638 }
639
640 async fn diff_setup_states(
641 &self,
642 key: &serde_json::Value,
643 desired_state: Option<serde_json::Value>,
644 existing_states: setup::CombinedState<serde_json::Value>,
645 flow_instance_ctx: Arc<FlowInstanceContext>,
646 ) -> Result<Box<dyn setup::ResourceSetupChange>> {
647 let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
648 let desired_state: Option<T::SetupState> = desired_state
649 .map(|v| utils::deser::from_json_value(v.clone()))
650 .transpose()?;
651 let existing_states = from_json_combined_state(existing_states)?;
652 let setup_change = TargetFactoryBase::diff_setup_states(
653 self,
654 key,
655 desired_state,
656 existing_states,
657 flow_instance_ctx,
658 )
659 .await?;
660 Ok(Box::new(setup_change))
661 }
662
663 fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
664 let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
665 TargetFactoryBase::describe_resource(self, &key)
666 }
667
668 fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
669 let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
670 Ok(serde_json::to_value(key)?)
671 }
672
673 fn check_state_compatibility(
674 &self,
675 desired_state: &serde_json::Value,
676 existing_state: &serde_json::Value,
677 ) -> Result<SetupStateCompatibility> {
678 let result = TargetFactoryBase::check_state_compatibility(
679 self,
680 &utils::deser::from_json_value(desired_state.clone())?,
681 &utils::deser::from_json_value(existing_state.clone())?,
682 )?;
683 Ok(result)
684 }
685
686 fn extract_additional_key(
689 &self,
690 key: &value::KeyValue,
691 value: &value::FieldValues,
692 export_context: &(dyn Any + Send + Sync),
693 ) -> Result<serde_json::Value> {
694 TargetFactoryBase::extract_additional_key(
695 self,
696 key,
697 value,
698 export_context
699 .downcast_ref::<T::ExportContext>()
700 .ok_or_else(invariance_violation)?,
701 )
702 }
703
704 async fn apply_mutation(
705 &self,
706 mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
707 ) -> Result<()> {
708 let mutations = mutations
709 .into_iter()
710 .map(|m| -> Result<_> {
711 Ok(ExportTargetMutationWithContext {
712 mutation: m.mutation,
713 export_context: m
714 .export_context
715 .downcast_ref::<T::ExportContext>()
716 .ok_or_else(invariance_violation)?,
717 })
718 })
719 .collect::<Result<_>>()?;
720 TargetFactoryBase::apply_mutation(self, mutations).await
721 }
722
723 async fn apply_setup_changes(
724 &self,
725 setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
726 context: Arc<FlowInstanceContext>,
727 ) -> Result<()> {
728 TargetFactoryBase::apply_setup_changes(
729 self,
730 setup_change
731 .into_iter()
732 .map(|item| -> Result<_> {
733 Ok(TypedResourceSetupChangeItem {
734 key: utils::deser::from_json_value(item.key.clone())?,
735 setup_change: (item.setup_change as &dyn Any)
736 .downcast_ref::<T::SetupChange>()
737 .ok_or_else(invariance_violation)?,
738 })
739 })
740 .collect::<Result<Vec<_>>>()?,
741 context,
742 )
743 .await
744 }
745}
746fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
747 existing_states: setup::CombinedState<serde_json::Value>,
748) -> Result<setup::CombinedState<T>> {
749 Ok(setup::CombinedState {
750 current: existing_states
751 .current
752 .map(|v| utils::deser::from_json_value(v))
753 .transpose()?,
754 staging: existing_states
755 .staging
756 .into_iter()
757 .map(|v| -> Result<_> {
758 Ok(match v {
759 setup::StateChange::Upsert(v) => {
760 setup::StateChange::Upsert(utils::deser::from_json_value(v)?)
761 }
762 setup::StateChange::Delete => setup::StateChange::Delete,
763 })
764 })
765 .collect::<Result<_>>()?,
766 legacy_state_key: existing_states.legacy_state_key,
767 })
768}
769
770pub struct TypedTargetAttachmentState<F: TargetSpecificAttachmentFactoryBase + ?Sized> {
775 pub setup_key: F::SetupKey,
776 pub setup_state: F::SetupState,
777}
778
779#[async_trait]
781pub trait TargetSpecificAttachmentFactoryBase: Send + Sync + 'static {
782 type TargetKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
783 type TargetSpec: DeserializeOwned + Send + Sync;
784 type Spec: DeserializeOwned + Send + Sync;
785 type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
786 type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
787 type SetupChange: interface::AttachmentSetupChange + Send + Sync;
788
789 fn name(&self) -> &str;
790
791 fn get_state(
792 &self,
793 target_name: &str,
794 target_spec: &Self::TargetSpec,
795 attachment_spec: Self::Spec,
796 ) -> Result<TypedTargetAttachmentState<Self>>;
797
798 async fn diff_setup_states(
799 &self,
800 target_key: &Self::TargetKey,
801 attachment_key: &Self::SetupKey,
802 new_state: Option<Self::SetupState>,
803 existing_states: setup::CombinedState<Self::SetupState>,
804 context: &interface::FlowInstanceContext,
805 ) -> Result<Option<Self::SetupChange>>;
806
807 fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
810 Ok(utils::deser::from_json_value(key)?)
811 }
812
813 fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
814 where
815 Self: Sized,
816 {
817 registry.register(
818 self.name().to_string(),
819 ExecutorFactory::TargetAttachment(Arc::new(self)),
820 )
821 }
822}
823
824#[async_trait]
825impl<T: TargetSpecificAttachmentFactoryBase> TargetAttachmentFactory for T {
826 fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
827 let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
828 Ok(serde_json::to_value(key)?)
829 }
830
831 fn get_state(
832 &self,
833 target_name: &str,
834 target_spec: &serde_json::Map<String, serde_json::Value>,
835 attachment_spec: serde_json::Value,
836 ) -> Result<interface::TargetAttachmentState> {
837 let state = TargetSpecificAttachmentFactoryBase::get_state(
838 self,
839 target_name,
840 &utils::deser::from_json_value(serde_json::Value::Object(target_spec.clone()))?,
841 utils::deser::from_json_value(attachment_spec)?,
842 )?;
843 Ok(interface::TargetAttachmentState {
844 setup_key: serde_json::to_value(state.setup_key)?,
845 setup_state: serde_json::to_value(state.setup_state)?,
846 })
847 }
848
849 async fn diff_setup_states(
850 &self,
851 target_key: &serde_json::Value,
852 attachment_key: &serde_json::Value,
853 new_state: Option<serde_json::Value>,
854 existing_states: setup::CombinedState<serde_json::Value>,
855 context: &interface::FlowInstanceContext,
856 ) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>> {
857 let setup_change = self
858 .diff_setup_states(
859 &utils::deser::from_json_value(target_key.clone())?,
860 &utils::deser::from_json_value(attachment_key.clone())?,
861 new_state.map(utils::deser::from_json_value).transpose()?,
862 from_json_combined_state(existing_states)?,
863 context,
864 )
865 .await?;
866 Ok(setup_change.map(|s| Box::new(s) as Box<dyn AttachmentSetupChange + Send + Sync>))
867 }
868}