thread_flow/functions/
imports.rs1use async_trait::async_trait;
5use recoco::base::schema::{EnrichedValueType, TableKind, TableSchema, ValueType};
6use recoco::base::value::Value;
7use recoco::ops::factory_bases::SimpleFunctionFactoryBase;
8use recoco::ops::interface::{FlowInstanceContext, SimpleFunctionExecutor};
9use recoco::ops::sdk::{OpArgsResolver, SimpleFunctionAnalysisOutput};
10use serde::Deserialize;
11use std::sync::Arc;
12
13pub struct ExtractImportsFactory;
15
16#[derive(Debug, Clone, Deserialize)]
18pub struct ExtractImportsSpec {}
19
20#[async_trait]
21impl SimpleFunctionFactoryBase for ExtractImportsFactory {
22 type Spec = ExtractImportsSpec;
23 type ResolvedArgs = ();
24
25 fn name(&self) -> &str {
26 "extract_imports"
27 }
28
29 async fn analyze<'a>(
30 &'a self,
31 _spec: &'a Self::Spec,
32 _args_resolver: &mut OpArgsResolver<'a>,
33 _context: &FlowInstanceContext,
34 ) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>, recoco::prelude::Error> {
35 Ok(SimpleFunctionAnalysisOutput {
36 resolved_args: (),
37 output_schema: get_imports_output_schema(),
38 behavior_version: Some(1),
39 })
40 }
41
42 async fn build_executor(
43 self: Arc<Self>,
44 _spec: Self::Spec,
45 _resolved_args: Self::ResolvedArgs,
46 _context: Arc<FlowInstanceContext>,
47 ) -> Result<impl SimpleFunctionExecutor, recoco::prelude::Error> {
48 Ok(ExtractImportsExecutor)
49 }
50}
51
52pub struct ExtractImportsExecutor;
54
55#[async_trait]
56impl SimpleFunctionExecutor for ExtractImportsExecutor {
57 async fn evaluate(&self, input: Vec<Value>) -> Result<Value, recoco::prelude::Error> {
58 let parsed_doc = input
60 .first()
61 .ok_or_else(|| recoco::prelude::Error::client("Missing parsed_document input"))?;
62
63 match parsed_doc {
65 Value::Struct(field_values) => {
66 let imports = field_values
67 .fields
68 .get(1)
69 .ok_or_else(|| {
70 recoco::prelude::Error::client("Missing imports field in parsed_document")
71 })?
72 .clone();
73
74 Ok(imports)
75 }
76 _ => Err(recoco::prelude::Error::client(
77 "Expected Struct for parsed_document",
78 )),
79 }
80 }
81
82 fn enable_cache(&self) -> bool {
83 true
84 }
85
86 fn timeout(&self) -> Option<std::time::Duration> {
87 Some(std::time::Duration::from_secs(30))
88 }
89}
90
91fn get_imports_output_schema() -> EnrichedValueType {
93 EnrichedValueType {
94 typ: ValueType::Table(TableSchema {
95 kind: TableKind::LTable,
96 row: match crate::conversion::import_type() {
97 ValueType::Struct(s) => s,
98 _ => unreachable!(),
99 },
100 }),
101 nullable: false,
102 attrs: Default::default(),
103 }
104}