recoco_core/builder/
analyzed_flow.rs1use crate::{ops::interface::FlowInstanceContext, prelude::*};
14
15use super::{analyzer, plan};
16use recoco_utils::error::{SharedError, SharedResultExt, shared_ok};
17
18pub struct AnalyzedFlow {
19 pub flow_instance: spec::FlowInstanceSpec,
20 pub data_schema: schema::FlowSchema,
21 pub setup_state: exec_ctx::AnalyzedSetupState,
22
23 pub flow_instance_ctx: Arc<FlowInstanceContext>,
24
25 pub execution_plan:
27 Shared<BoxFuture<'static, std::result::Result<Arc<plan::ExecutionPlan>, SharedError>>>,
28}
29
30impl AnalyzedFlow {
31 pub async fn from_flow_instance(
32 flow_instance: crate::base::spec::FlowInstanceSpec,
33 flow_instance_ctx: Arc<FlowInstanceContext>,
34 ) -> Result<Self> {
35 let (data_schema, setup_state, execution_plan_fut) =
36 analyzer::analyze_flow(&flow_instance, flow_instance_ctx.clone())
37 .await
38 .with_context(|| format!("analyzing flow `{}`", flow_instance.name))?;
39 let execution_plan = async move {
40 shared_ok(Arc::new(
41 execution_plan_fut.await.map_err(SharedError::from)?,
42 ))
43 }
44 .boxed()
45 .shared();
46 let result = Self {
47 flow_instance,
48 data_schema,
49 setup_state,
50 flow_instance_ctx,
51 execution_plan,
52 };
53 Ok(result)
54 }
55
56 pub async fn get_execution_plan(&self) -> Result<Arc<plan::ExecutionPlan>> {
57 let execution_plan = self.execution_plan.clone().await.into_result()?;
58 Ok(execution_plan)
59 }
60}
61
62pub struct AnalyzedTransientFlow {
63 pub transient_flow_instance: spec::TransientFlowSpec,
64 pub data_schema: schema::FlowSchema,
65 pub execution_plan: plan::TransientExecutionPlan,
66 pub output_type: schema::EnrichedValueType,
67}
68
69impl AnalyzedTransientFlow {
70 pub async fn from_transient_flow(transient_flow: spec::TransientFlowSpec) -> Result<Self> {
71 let ctx = analyzer::build_flow_instance_context(&transient_flow.name);
72 let (output_type, data_schema, execution_plan_fut) =
73 analyzer::analyze_transient_flow(&transient_flow, ctx).await?;
74 Ok(Self {
75 transient_flow_instance: transient_flow,
76 data_schema,
77 execution_plan: execution_plan_fut.await?,
78 output_type,
79 })
80 }
81}