Skip to main content

recoco_core/builder/
analyzed_flow.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::{ops::interface::FlowInstanceContext, prelude::*};
14
15use super::{analyzer, plan};
16use recoco_utils::error::{SharedError, SharedResultExt, shared_ok};
17
18pub struct AnalyzedFlow {
19    pub flow_instance: spec::FlowInstanceSpec,
20    pub data_schema: schema::FlowSchema,
21    pub setup_state: exec_ctx::AnalyzedSetupState,
22
23    pub flow_instance_ctx: Arc<FlowInstanceContext>,
24
25    /// It's None if the flow is not up to date
26    pub execution_plan:
27        Shared<BoxFuture<'static, std::result::Result<Arc<plan::ExecutionPlan>, SharedError>>>,
28}
29
30impl AnalyzedFlow {
31    pub async fn from_flow_instance(
32        flow_instance: crate::base::spec::FlowInstanceSpec,
33        flow_instance_ctx: Arc<FlowInstanceContext>,
34    ) -> Result<Self> {
35        let (data_schema, setup_state, execution_plan_fut) =
36            analyzer::analyze_flow(&flow_instance, flow_instance_ctx.clone())
37                .await
38                .with_context(|| format!("analyzing flow `{}`", flow_instance.name))?;
39        let execution_plan = async move {
40            shared_ok(Arc::new(
41                execution_plan_fut.await.map_err(SharedError::from)?,
42            ))
43        }
44        .boxed()
45        .shared();
46        let result = Self {
47            flow_instance,
48            data_schema,
49            setup_state,
50            flow_instance_ctx,
51            execution_plan,
52        };
53        Ok(result)
54    }
55
56    pub async fn get_execution_plan(&self) -> Result<Arc<plan::ExecutionPlan>> {
57        let execution_plan = self.execution_plan.clone().await.into_result()?;
58        Ok(execution_plan)
59    }
60}
61
62pub struct AnalyzedTransientFlow {
63    pub transient_flow_instance: spec::TransientFlowSpec,
64    pub data_schema: schema::FlowSchema,
65    pub execution_plan: plan::TransientExecutionPlan,
66    pub output_type: schema::EnrichedValueType,
67}
68
69impl AnalyzedTransientFlow {
70    pub async fn from_transient_flow(transient_flow: spec::TransientFlowSpec) -> Result<Self> {
71        let ctx = analyzer::build_flow_instance_context(&transient_flow.name);
72        let (output_type, data_schema, execution_plan_fut) =
73            analyzer::analyze_transient_flow(&transient_flow, ctx).await?;
74        Ok(Self {
75            transient_flow_instance: transient_flow,
76            data_schema,
77            execution_plan: execution_plan_fut.await?,
78            output_type,
79        })
80    }
81}