Skip to main content

thread_flow/functions/
calls.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4use async_trait::async_trait;
5use recoco::base::schema::{EnrichedValueType, TableKind, TableSchema, ValueType};
6use recoco::base::value::Value;
7use recoco::ops::factory_bases::SimpleFunctionFactoryBase;
8use recoco::ops::interface::{FlowInstanceContext, SimpleFunctionExecutor};
9use recoco::ops::sdk::{OpArgsResolver, SimpleFunctionAnalysisOutput};
10use serde::Deserialize;
11use std::sync::Arc;
12
13/// Factory for creating the ExtractCallsExecutor
14pub struct ExtractCallsFactory;
15
16/// Spec for extract_calls operator (empty - uses default args)
17#[derive(Debug, Clone, Deserialize)]
18pub struct ExtractCallsSpec {}
19
20#[async_trait]
21impl SimpleFunctionFactoryBase for ExtractCallsFactory {
22    type Spec = ExtractCallsSpec;
23    type ResolvedArgs = ();
24
25    fn name(&self) -> &str {
26        "extract_calls"
27    }
28
29    async fn analyze<'a>(
30        &'a self,
31        _spec: &'a Self::Spec,
32        _args_resolver: &mut OpArgsResolver<'a>,
33        _context: &FlowInstanceContext,
34    ) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>, recoco::prelude::Error> {
35        Ok(SimpleFunctionAnalysisOutput {
36            resolved_args: (),
37            output_schema: get_calls_output_schema(),
38            behavior_version: Some(1),
39        })
40    }
41
42    async fn build_executor(
43        self: Arc<Self>,
44        _spec: Self::Spec,
45        _resolved_args: Self::ResolvedArgs,
46        _context: Arc<FlowInstanceContext>,
47    ) -> Result<impl SimpleFunctionExecutor, recoco::prelude::Error> {
48        Ok(ExtractCallsExecutor)
49    }
50}
51
52/// Executor that extracts the calls table from a parsed document
53pub struct ExtractCallsExecutor;
54
55#[async_trait]
56impl SimpleFunctionExecutor for ExtractCallsExecutor {
57    async fn evaluate(&self, input: Vec<Value>) -> Result<Value, recoco::prelude::Error> {
58        // Input: parsed_document (Struct with fields: symbols, imports, calls)
59        let parsed_doc = input
60            .first()
61            .ok_or_else(|| recoco::prelude::Error::client("Missing parsed_document input"))?;
62
63        // Extract the third field (calls table)
64        match parsed_doc {
65            Value::Struct(field_values) => {
66                let calls = field_values
67                    .fields
68                    .get(2)
69                    .ok_or_else(|| {
70                        recoco::prelude::Error::client("Missing calls field in parsed_document")
71                    })?
72                    .clone();
73
74                Ok(calls)
75            }
76            _ => Err(recoco::prelude::Error::client(
77                "Expected Struct for parsed_document",
78            )),
79        }
80    }
81
82    fn enable_cache(&self) -> bool {
83        true
84    }
85
86    fn timeout(&self) -> Option<std::time::Duration> {
87        Some(std::time::Duration::from_secs(30))
88    }
89}
90
91/// Build the schema for the output of ExtractCalls (just the calls table)
92fn get_calls_output_schema() -> EnrichedValueType {
93    EnrichedValueType {
94        typ: ValueType::Table(TableSchema {
95            kind: TableKind::LTable,
96            row: match crate::conversion::call_type() {
97                ValueType::Struct(s) => s,
98                _ => unreachable!(),
99            },
100        }),
101        nullable: false,
102        attrs: Default::default(),
103    }
104}