Skip to main content

recoco_core/ops/
factory_bases.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::prelude::*;
14use crate::setup::ResourceSetupChange;
15use std::fmt::Debug;
16use std::hash::Hash;
17
18use super::interface::*;
19use super::registry::*;
20use crate::base::schema::*;
21use crate::base::spec::*;
22use crate::builder::plan::AnalyzedValueMapping;
23use crate::setup;
24
25////////////////////////////////////////////////////////
26// Op Args
27////////////////////////////////////////////////////////
28
29pub struct OpArgResolver<'arg> {
30    name: String,
31    resolved_op_arg: Option<(usize, EnrichedValueType)>,
32    nonnull_args_idx: &'arg mut Vec<usize>,
33    may_nullify_output: &'arg mut bool,
34}
35
36impl<'arg> OpArgResolver<'arg> {
37    pub fn expect_nullable_type(self, expected_type: &ValueType) -> Result<Self> {
38        let Some((_, typ)) = &self.resolved_op_arg else {
39            return Ok(self);
40        };
41        if &typ.typ != expected_type {
42            api_bail!(
43                "Expected argument `{}` to be of type `{}`, got `{}`",
44                self.name,
45                expected_type,
46                typ.typ
47            );
48        }
49        Ok(self)
50    }
51    pub fn expect_type(self, expected_type: &ValueType) -> Result<Self> {
52        let resolver = self.expect_nullable_type(expected_type)?;
53        if let Some((idx, typ)) = resolver.resolved_op_arg.as_ref() {
54            resolver.nonnull_args_idx.push(*idx);
55            if typ.nullable {
56                *resolver.may_nullify_output = true;
57            }
58        }
59        Ok(resolver)
60    }
61
62    pub fn optional(self) -> Option<ResolvedOpArg> {
63        self.resolved_op_arg.map(|(idx, typ)| ResolvedOpArg {
64            name: self.name,
65            typ,
66            idx,
67        })
68    }
69
70    pub fn required(self) -> Result<ResolvedOpArg> {
71        let Some((idx, typ)) = self.resolved_op_arg else {
72            api_bail!("Required argument `{}` is missing", self.name);
73        };
74        Ok(ResolvedOpArg {
75            name: self.name,
76            typ,
77            idx,
78        })
79    }
80}
81
82pub struct ResolvedOpArg {
83    pub name: String,
84    pub typ: EnrichedValueType,
85    pub idx: usize,
86}
87
88pub trait ResolvedOpArgExt: Sized {
89    fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value>;
90    #[allow(dead_code)]
91    fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value>;
92}
93
94impl ResolvedOpArgExt for ResolvedOpArg {
95    fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value> {
96        if self.idx >= args.len() {
97            api_bail!(
98                "Two few arguments, {} provided, expected at least {} for `{}`",
99                args.len(),
100                self.idx + 1,
101                self.name
102            );
103        }
104        Ok(&args[self.idx])
105    }
106
107    fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value> {
108        if self.idx >= args.len() {
109            api_bail!(
110                "Two few arguments, {} provided, expected at least {} for `{}`",
111                args.len(),
112                self.idx + 1,
113                self.name
114            );
115        }
116        Ok(std::mem::take(&mut args[self.idx]))
117    }
118}
119
120impl ResolvedOpArgExt for Option<ResolvedOpArg> {
121    fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value> {
122        Ok(self
123            .as_ref()
124            .map(|arg| arg.value(args))
125            .transpose()?
126            .unwrap_or(&value::Value::Null))
127    }
128
129    fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value> {
130        Ok(self
131            .as_ref()
132            .map(|arg| arg.take_value(args))
133            .transpose()?
134            .unwrap_or(value::Value::Null))
135    }
136}
137
138pub struct OpArgsResolver<'a> {
139    args: &'a [OpArgSchema],
140    num_positional_args: usize,
141    next_positional_idx: usize,
142    remaining_kwargs: HashMap<&'a str, usize>,
143    nonnull_args_idx: &'a mut Vec<usize>,
144    may_nullify_output: &'a mut bool,
145}
146
147impl<'a> OpArgsResolver<'a> {
148    pub fn new(
149        args: &'a [OpArgSchema],
150        nonnull_args_idx: &'a mut Vec<usize>,
151        may_nullify_output: &'a mut bool,
152    ) -> Result<Self> {
153        let mut num_positional_args = 0;
154        let mut kwargs = HashMap::new();
155        for (idx, arg) in args.iter().enumerate() {
156            if let Some(name) = &arg.name.0 {
157                kwargs.insert(name.as_str(), idx);
158            } else {
159                if !kwargs.is_empty() {
160                    api_bail!("Positional arguments must be provided before keyword arguments");
161                }
162                num_positional_args += 1;
163            }
164        }
165        Ok(Self {
166            args,
167            num_positional_args,
168            next_positional_idx: 0,
169            remaining_kwargs: kwargs,
170            nonnull_args_idx,
171            may_nullify_output,
172        })
173    }
174
175    pub fn next_arg<'arg>(&'arg mut self, name: &str) -> Result<OpArgResolver<'arg>> {
176        let idx = if let Some(idx) = self.remaining_kwargs.remove(name) {
177            if self.next_positional_idx < self.num_positional_args {
178                api_bail!("`{name}` is provided as both positional and keyword arguments");
179            } else {
180                Some(idx)
181            }
182        } else if self.next_positional_idx < self.num_positional_args {
183            let idx = self.next_positional_idx;
184            self.next_positional_idx += 1;
185            Some(idx)
186        } else {
187            None
188        };
189        Ok(OpArgResolver {
190            name: name.to_string(),
191            resolved_op_arg: idx.map(|idx| (idx, self.args[idx].value_type.clone())),
192            nonnull_args_idx: self.nonnull_args_idx,
193            may_nullify_output: self.may_nullify_output,
194        })
195    }
196
197    pub fn done(self) -> Result<()> {
198        if self.next_positional_idx < self.num_positional_args {
199            api_bail!(
200                "Expected {} positional arguments, got {}",
201                self.next_positional_idx,
202                self.num_positional_args
203            );
204        }
205        if !self.remaining_kwargs.is_empty() {
206            api_bail!(
207                "Unexpected keyword arguments: {}",
208                self.remaining_kwargs
209                    .keys()
210                    .map(|k| format!("`{k}`"))
211                    .collect::<Vec<_>>()
212                    .join(", ")
213            )
214        }
215        Ok(())
216    }
217
218    pub fn get_analyze_value(&self, resolved_arg: &ResolvedOpArg) -> &AnalyzedValueMapping {
219        &self.args[resolved_arg.idx].analyzed_value
220    }
221}
222
223////////////////////////////////////////////////////////
224// Source
225////////////////////////////////////////////////////////
226
227#[async_trait]
228pub trait SourceFactoryBase: SourceFactory + Send + Sync + 'static {
229    type Spec: DeserializeOwned + Send + Sync;
230
231    fn name(&self) -> &str;
232
233    async fn get_output_schema(
234        &self,
235        spec: &Self::Spec,
236        context: &FlowInstanceContext,
237    ) -> Result<EnrichedValueType>;
238
239    async fn build_executor(
240        self: Arc<Self>,
241        source_name: &str,
242        spec: Self::Spec,
243        context: Arc<FlowInstanceContext>,
244    ) -> Result<Box<dyn SourceExecutor>>;
245
246    fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
247    where
248        Self: Sized,
249    {
250        registry.register(
251            self.name().to_string(),
252            ExecutorFactory::Source(Arc::new(self)),
253        )
254    }
255}
256
257#[async_trait]
258impl<T: SourceFactoryBase> SourceFactory for T {
259    async fn build(
260        self: Arc<Self>,
261        source_name: &str,
262        spec: serde_json::Value,
263        context: Arc<FlowInstanceContext>,
264    ) -> Result<(
265        EnrichedValueType,
266        BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
267    )> {
268        let spec: T::Spec = utils::deser::from_json_value(spec)
269            .map_err(Error::from)
270            .with_context(|| format!("Failed in parsing spec for source `{source_name}`"))?;
271        let output_schema = self.get_output_schema(&spec, &context).await?;
272        let source_name = source_name.to_string();
273        let executor = async move { self.build_executor(&source_name, spec, context).await };
274        Ok((output_schema, Box::pin(executor)))
275    }
276}
277
278////////////////////////////////////////////////////////
279// Function
280////////////////////////////////////////////////////////
281
282pub struct SimpleFunctionAnalysisOutput<T: Send + Sync> {
283    pub resolved_args: T,
284    pub output_schema: EnrichedValueType,
285    pub behavior_version: Option<u32>,
286}
287
288#[async_trait]
289pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'static {
290    type Spec: DeserializeOwned + Send + Sync;
291    type ResolvedArgs: Send + Sync;
292
293    fn name(&self) -> &str;
294
295    async fn analyze<'a>(
296        &'a self,
297        spec: &'a Self::Spec,
298        args_resolver: &mut OpArgsResolver<'a>,
299        context: &FlowInstanceContext,
300    ) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>>;
301
302    async fn build_executor(
303        self: Arc<Self>,
304        spec: Self::Spec,
305        resolved_args: Self::ResolvedArgs,
306        context: Arc<FlowInstanceContext>,
307    ) -> Result<impl SimpleFunctionExecutor>;
308
309    fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
310    where
311        Self: Sized,
312    {
313        registry.register(
314            self.name().to_string(),
315            ExecutorFactory::SimpleFunction(Arc::new(self)),
316        )
317    }
318}
319
320struct FunctionExecutorWrapper<E: SimpleFunctionExecutor> {
321    executor: E,
322    nonnull_args_idx: Vec<usize>,
323}
324
325#[async_trait]
326impl<E: SimpleFunctionExecutor> SimpleFunctionExecutor for FunctionExecutorWrapper<E> {
327    async fn evaluate(&self, args: Vec<value::Value>) -> Result<value::Value> {
328        for idx in &self.nonnull_args_idx {
329            if args[*idx].is_null() {
330                return Ok(value::Value::Null);
331            }
332        }
333        self.executor.evaluate(args).await
334    }
335
336    fn enable_cache(&self) -> bool {
337        self.executor.enable_cache()
338    }
339}
340
341#[async_trait]
342impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
343    async fn build(
344        self: Arc<Self>,
345        spec: serde_json::Value,
346        input_schema: Vec<OpArgSchema>,
347        context: Arc<FlowInstanceContext>,
348    ) -> Result<SimpleFunctionBuildOutput> {
349        let spec: T::Spec = utils::deser::from_json_value(spec)
350            .map_err(Error::from)
351            .with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?;
352        let mut nonnull_args_idx = vec![];
353        let mut may_nullify_output = false;
354        let mut args_resolver = OpArgsResolver::new(
355            &input_schema,
356            &mut nonnull_args_idx,
357            &mut may_nullify_output,
358        )?;
359        let SimpleFunctionAnalysisOutput {
360            resolved_args,
361            mut output_schema,
362            behavior_version,
363        } = self.analyze(&spec, &mut args_resolver, &context).await?;
364        args_resolver.done()?;
365
366        // If any required argument is nullable, the output schema should be nullable.
367        if may_nullify_output {
368            output_schema.nullable = true;
369        }
370
371        let executor = async move {
372            Ok(Box::new(FunctionExecutorWrapper {
373                executor: self.build_executor(spec, resolved_args, context).await?,
374                nonnull_args_idx,
375            }) as Box<dyn SimpleFunctionExecutor>)
376        };
377        Ok(SimpleFunctionBuildOutput {
378            output_type: output_schema,
379            behavior_version,
380            executor: Box::pin(executor),
381        })
382    }
383}
384
385#[async_trait]
386pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static {
387    async fn evaluate_batch(&self, args: Vec<Vec<value::Value>>) -> Result<Vec<value::Value>>;
388
389    fn enable_cache(&self) -> bool {
390        false
391    }
392
393    fn timeout(&self) -> Option<std::time::Duration> {
394        None
395    }
396
397    fn into_fn_executor(self) -> impl SimpleFunctionExecutor {
398        BatchedFunctionExecutorWrapper::new(self)
399    }
400
401    #[cfg(feature = "batching")]
402    fn batching_options(&self) -> batching::BatchingOptions;
403}
404
405#[cfg(feature = "batching")]
406struct BatchedFunctionExecutorRunner<E: BatchedFunctionExecutor>(E);
407
408#[cfg(feature = "batching")]
409#[async_trait]
410impl<E: BatchedFunctionExecutor> batching::Runner for BatchedFunctionExecutorRunner<E> {
411    type Input = Vec<value::Value>;
412    type Output = value::Value;
413
414    async fn run(
415        &self,
416        inputs: Vec<Self::Input>,
417    ) -> Result<impl ExactSizeIterator<Item = Self::Output>> {
418        Ok(self.0.evaluate_batch(inputs).await?.into_iter())
419    }
420}
421
422struct BatchedFunctionExecutorWrapper<E: BatchedFunctionExecutor> {
423    #[cfg(feature = "batching")]
424    batcher: batching::Batcher<BatchedFunctionExecutorRunner<E>>,
425    #[cfg(not(feature = "batching"))]
426    executor: E,
427    enable_cache: bool,
428    timeout: Option<std::time::Duration>,
429}
430
431impl<E: BatchedFunctionExecutor> BatchedFunctionExecutorWrapper<E> {
432    fn new(executor: E) -> Self {
433        let enable_cache = executor.enable_cache();
434        let timeout = executor.timeout();
435        #[cfg(feature = "batching")]
436        {
437            let batching_options = executor.batching_options();
438            Self {
439                enable_cache,
440                timeout,
441                batcher: batching::Batcher::new(
442                    BatchedFunctionExecutorRunner(executor),
443                    batching_options,
444                ),
445            }
446        }
447        #[cfg(not(feature = "batching"))]
448        {
449            Self {
450                enable_cache,
451                timeout,
452                executor,
453            }
454        }
455    }
456}
457
458#[async_trait]
459impl<E: BatchedFunctionExecutor> SimpleFunctionExecutor for BatchedFunctionExecutorWrapper<E> {
460    async fn evaluate(&self, args: Vec<value::Value>) -> Result<value::Value> {
461        #[cfg(feature = "batching")]
462        {
463            self.batcher.run(args).await
464        }
465        #[cfg(not(feature = "batching"))]
466        {
467            let results = self.executor.evaluate_batch(vec![args]).await?;
468            results
469                .into_iter()
470                .next()
471                .ok_or_else(|| internal_error!("Expected exactly one result from evaluate_batch"))
472        }
473    }
474
475    fn enable_cache(&self) -> bool {
476        self.enable_cache
477    }
478    fn timeout(&self) -> Option<std::time::Duration> {
479        self.timeout
480    }
481}
482
483////////////////////////////////////////////////////////
484// Target
485////////////////////////////////////////////////////////
486
487pub struct TypedExportDataCollectionBuildOutput<F: TargetFactoryBase + ?Sized> {
488    pub export_context: BoxFuture<'static, Result<Arc<F::ExportContext>>>,
489    pub setup_key: F::SetupKey,
490    pub desired_setup_state: F::SetupState,
491}
492pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
493    pub name: String,
494    pub spec: F::Spec,
495    pub key_fields_schema: Box<[FieldSchema]>,
496    pub value_fields_schema: Vec<FieldSchema>,
497    pub index_options: IndexOptions,
498}
499
500pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> {
501    pub key: F::SetupKey,
502    pub setup_change: &'a F::SetupChange,
503}
504
505#[async_trait]
506pub trait TargetFactoryBase: Send + Sync + 'static {
507    type Spec: DeserializeOwned + Send + Sync;
508    type DeclarationSpec: DeserializeOwned + Send + Sync;
509
510    type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
511    type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
512    type SetupChange: ResourceSetupChange;
513
514    type ExportContext: Send + Sync + 'static;
515
516    fn name(&self) -> &str;
517
518    async fn build(
519        self: Arc<Self>,
520        data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
521        declarations: Vec<Self::DeclarationSpec>,
522        context: Arc<FlowInstanceContext>,
523    ) -> Result<(
524        Vec<TypedExportDataCollectionBuildOutput<Self>>,
525        Vec<(Self::SetupKey, Self::SetupState)>,
526    )>;
527
528    /// Deserialize the setup key from a JSON value.
529    /// You can override this method to provide a custom deserialization logic, e.g. to perform backward compatible deserialization.
530    fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
531        Ok(utils::deser::from_json_value(key)?)
532    }
533
534    /// Will not be called if it's setup by user.
535    /// It returns an error if the target only supports setup by user.
536    async fn diff_setup_states(
537        &self,
538        key: Self::SetupKey,
539        desired_state: Option<Self::SetupState>,
540        existing_states: setup::CombinedState<Self::SetupState>,
541        flow_instance_ctx: Arc<FlowInstanceContext>,
542    ) -> Result<Self::SetupChange>;
543
544    fn check_state_compatibility(
545        &self,
546        desired_state: &Self::SetupState,
547        existing_state: &Self::SetupState,
548    ) -> Result<SetupStateCompatibility>;
549
550    fn describe_resource(&self, key: &Self::SetupKey) -> Result<String>;
551
552    fn extract_additional_key(
553        &self,
554        _key: &value::KeyValue,
555        _value: &value::FieldValues,
556        _export_context: &Self::ExportContext,
557    ) -> Result<serde_json::Value> {
558        Ok(serde_json::Value::Null)
559    }
560
561    fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
562    where
563        Self: Sized,
564    {
565        registry.register(
566            self.name().to_string(),
567            ExecutorFactory::ExportTarget(Arc::new(self)),
568        )
569    }
570
571    async fn apply_mutation(
572        &self,
573        mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
574    ) -> Result<()>;
575
576    async fn apply_setup_changes(
577        &self,
578        setup_change: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
579        context: Arc<FlowInstanceContext>,
580    ) -> Result<()>;
581}
582
583#[async_trait]
584impl<T: TargetFactoryBase> TargetFactory for T {
585    async fn build(
586        self: Arc<Self>,
587        data_collections: Vec<interface::ExportDataCollectionSpec>,
588        declarations: Vec<serde_json::Value>,
589        context: Arc<FlowInstanceContext>,
590    ) -> Result<(
591        Vec<interface::ExportDataCollectionBuildOutput>,
592        Vec<(serde_json::Value, serde_json::Value)>,
593    )> {
594        let (data_coll_output, decl_output) = TargetFactoryBase::build(
595            self,
596            data_collections
597                .into_iter()
598                .map(|d| -> Result<_> {
599                    Ok(TypedExportDataCollectionSpec {
600                        spec: utils::deser::from_json_value(d.spec)
601                            .map_err(Error::from)
602                            .with_context(|| {
603                                format!("Failed in parsing spec for target `{}`", d.name)
604                            })?,
605                        name: d.name,
606                        key_fields_schema: d.key_fields_schema,
607                        value_fields_schema: d.value_fields_schema,
608                        index_options: d.index_options,
609                    })
610                })
611                .collect::<Result<Vec<_>>>()?,
612            declarations
613                .into_iter()
614                .map(|d| -> Result<_> { Ok(utils::deser::from_json_value(d)?) })
615                .collect::<Result<Vec<_>>>()?,
616            context,
617        )
618        .await?;
619
620        let data_coll_output = data_coll_output
621            .into_iter()
622            .map(|d| {
623                Ok(interface::ExportDataCollectionBuildOutput {
624                    export_context: async move {
625                        Ok(d.export_context.await? as Arc<dyn Any + Send + Sync>)
626                    }
627                    .boxed(),
628                    setup_key: serde_json::to_value(d.setup_key)?,
629                    desired_setup_state: serde_json::to_value(d.desired_setup_state)?,
630                })
631            })
632            .collect::<Result<Vec<_>>>()?;
633        let decl_output = decl_output
634            .into_iter()
635            .map(|(key, state)| Ok((serde_json::to_value(key)?, serde_json::to_value(state)?)))
636            .collect::<Result<Vec<_>>>()?;
637        Ok((data_coll_output, decl_output))
638    }
639
640    async fn diff_setup_states(
641        &self,
642        key: &serde_json::Value,
643        desired_state: Option<serde_json::Value>,
644        existing_states: setup::CombinedState<serde_json::Value>,
645        flow_instance_ctx: Arc<FlowInstanceContext>,
646    ) -> Result<Box<dyn setup::ResourceSetupChange>> {
647        let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
648        let desired_state: Option<T::SetupState> = desired_state
649            .map(|v| utils::deser::from_json_value(v.clone()))
650            .transpose()?;
651        let existing_states = from_json_combined_state(existing_states)?;
652        let setup_change = TargetFactoryBase::diff_setup_states(
653            self,
654            key,
655            desired_state,
656            existing_states,
657            flow_instance_ctx,
658        )
659        .await?;
660        Ok(Box::new(setup_change))
661    }
662
663    fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
664        let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
665        TargetFactoryBase::describe_resource(self, &key)
666    }
667
668    fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
669        let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
670        Ok(serde_json::to_value(key)?)
671    }
672
673    fn check_state_compatibility(
674        &self,
675        desired_state: &serde_json::Value,
676        existing_state: &serde_json::Value,
677    ) -> Result<SetupStateCompatibility> {
678        let result = TargetFactoryBase::check_state_compatibility(
679            self,
680            &utils::deser::from_json_value(desired_state.clone())?,
681            &utils::deser::from_json_value(existing_state.clone())?,
682        )?;
683        Ok(result)
684    }
685
686    /// Extract additional keys that are passed through as part of the mutation to `apply_mutation()`.
687    /// This is useful for targets that need to use additional parts as key for the target (which is not considered as part of the key for cocoindex).
688    fn extract_additional_key(
689        &self,
690        key: &value::KeyValue,
691        value: &value::FieldValues,
692        export_context: &(dyn Any + Send + Sync),
693    ) -> Result<serde_json::Value> {
694        TargetFactoryBase::extract_additional_key(
695            self,
696            key,
697            value,
698            export_context
699                .downcast_ref::<T::ExportContext>()
700                .ok_or_else(invariance_violation)?,
701        )
702    }
703
704    async fn apply_mutation(
705        &self,
706        mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
707    ) -> Result<()> {
708        let mutations = mutations
709            .into_iter()
710            .map(|m| -> Result<_> {
711                Ok(ExportTargetMutationWithContext {
712                    mutation: m.mutation,
713                    export_context: m
714                        .export_context
715                        .downcast_ref::<T::ExportContext>()
716                        .ok_or_else(invariance_violation)?,
717                })
718            })
719            .collect::<Result<_>>()?;
720        TargetFactoryBase::apply_mutation(self, mutations).await
721    }
722
723    async fn apply_setup_changes(
724        &self,
725        setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
726        context: Arc<FlowInstanceContext>,
727    ) -> Result<()> {
728        TargetFactoryBase::apply_setup_changes(
729            self,
730            setup_change
731                .into_iter()
732                .map(|item| -> Result<_> {
733                    Ok(TypedResourceSetupChangeItem {
734                        key: utils::deser::from_json_value(item.key.clone())?,
735                        setup_change: (item.setup_change as &dyn Any)
736                            .downcast_ref::<T::SetupChange>()
737                            .ok_or_else(invariance_violation)?,
738                    })
739                })
740                .collect::<Result<Vec<_>>>()?,
741            context,
742        )
743        .await
744    }
745}
746fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
747    existing_states: setup::CombinedState<serde_json::Value>,
748) -> Result<setup::CombinedState<T>> {
749    Ok(setup::CombinedState {
750        current: existing_states
751            .current
752            .map(|v| utils::deser::from_json_value(v))
753            .transpose()?,
754        staging: existing_states
755            .staging
756            .into_iter()
757            .map(|v| -> Result<_> {
758                Ok(match v {
759                    setup::StateChange::Upsert(v) => {
760                        setup::StateChange::Upsert(utils::deser::from_json_value(v)?)
761                    }
762                    setup::StateChange::Delete => setup::StateChange::Delete,
763                })
764            })
765            .collect::<Result<_>>()?,
766        legacy_state_key: existing_states.legacy_state_key,
767    })
768}
769
770////////////////////////////////////////////////////////
771// Target Attachment
772////////////////////////////////////////////////////////
773
774pub struct TypedTargetAttachmentState<F: TargetSpecificAttachmentFactoryBase + ?Sized> {
775    pub setup_key: F::SetupKey,
776    pub setup_state: F::SetupState,
777}
778
779/// A factory for target-specific attachments.
780#[async_trait]
781pub trait TargetSpecificAttachmentFactoryBase: Send + Sync + 'static {
782    type TargetKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
783    type TargetSpec: DeserializeOwned + Send + Sync;
784    type Spec: DeserializeOwned + Send + Sync;
785    type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
786    type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
787    type SetupChange: interface::AttachmentSetupChange + Send + Sync;
788
789    fn name(&self) -> &str;
790
791    fn get_state(
792        &self,
793        target_name: &str,
794        target_spec: &Self::TargetSpec,
795        attachment_spec: Self::Spec,
796    ) -> Result<TypedTargetAttachmentState<Self>>;
797
798    async fn diff_setup_states(
799        &self,
800        target_key: &Self::TargetKey,
801        attachment_key: &Self::SetupKey,
802        new_state: Option<Self::SetupState>,
803        existing_states: setup::CombinedState<Self::SetupState>,
804        context: &interface::FlowInstanceContext,
805    ) -> Result<Option<Self::SetupChange>>;
806
807    /// Deserialize the setup key from a JSON value.
808    /// You can override this method to provide a custom deserialization logic, e.g. to perform backward compatible deserialization.
809    fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
810        Ok(utils::deser::from_json_value(key)?)
811    }
812
813    fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
814    where
815        Self: Sized,
816    {
817        registry.register(
818            self.name().to_string(),
819            ExecutorFactory::TargetAttachment(Arc::new(self)),
820        )
821    }
822}
823
824#[async_trait]
825impl<T: TargetSpecificAttachmentFactoryBase> TargetAttachmentFactory for T {
826    fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
827        let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
828        Ok(serde_json::to_value(key)?)
829    }
830
831    fn get_state(
832        &self,
833        target_name: &str,
834        target_spec: &serde_json::Map<String, serde_json::Value>,
835        attachment_spec: serde_json::Value,
836    ) -> Result<interface::TargetAttachmentState> {
837        let state = TargetSpecificAttachmentFactoryBase::get_state(
838            self,
839            target_name,
840            &utils::deser::from_json_value(serde_json::Value::Object(target_spec.clone()))?,
841            utils::deser::from_json_value(attachment_spec)?,
842        )?;
843        Ok(interface::TargetAttachmentState {
844            setup_key: serde_json::to_value(state.setup_key)?,
845            setup_state: serde_json::to_value(state.setup_state)?,
846        })
847    }
848
849    async fn diff_setup_states(
850        &self,
851        target_key: &serde_json::Value,
852        attachment_key: &serde_json::Value,
853        new_state: Option<serde_json::Value>,
854        existing_states: setup::CombinedState<serde_json::Value>,
855        context: &interface::FlowInstanceContext,
856    ) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>> {
857        let setup_change = self
858            .diff_setup_states(
859                &utils::deser::from_json_value(target_key.clone())?,
860                &utils::deser::from_json_value(attachment_key.clone())?,
861                new_state.map(utils::deser::from_json_value).transpose()?,
862                from_json_combined_state(existing_states)?,
863                context,
864            )
865            .await?;
866        Ok(setup_change.map(|s| Box::new(s) as Box<dyn AttachmentSetupChange + Send + Sync>))
867    }
868}