zen_engine/handler/function/
mod.rs

1use std::rc::Rc;
2use std::time::Duration;
3
4use ::serde::{Deserialize, Serialize};
5use anyhow::anyhow;
6use rquickjs::{async_with, CatchResultExt, Object};
7use serde_json::json;
8
9use crate::handler::function::error::FunctionResult;
10use crate::handler::function::function::{Function, HandlerResponse};
11use crate::handler::function::module::console::Log;
12use crate::handler::function::serde::JsValue;
13use crate::handler::node::{NodeRequest, NodeResponse, NodeResult, PartialTraceError};
14use crate::model::{DecisionNodeKind, FunctionNodeContent};
15
16pub(crate) mod error;
17pub(crate) mod function;
18pub(crate) mod listener;
19pub(crate) mod module;
20pub(crate) mod serde;
21
22#[derive(Serialize, Deserialize)]
23pub struct FunctionResponse {
24    performance: String,
25    data: Option<HandlerResponse>,
26}
27
28pub struct FunctionHandler {
29    function: Rc<Function>,
30    trace: bool,
31    iteration: u8,
32    max_depth: u8,
33}
34
35static MAX_DURATION: Duration = Duration::from_millis(5_000);
36
37impl FunctionHandler {
38    pub fn new(function: Rc<Function>, trace: bool, iteration: u8, max_depth: u8) -> Self {
39        Self {
40            function,
41            trace,
42            iteration,
43            max_depth,
44        }
45    }
46
47    pub async fn handle(&self, request: NodeRequest) -> NodeResult {
48        let content = match &request.node.kind {
49            DecisionNodeKind::FunctionNode { content } => match content {
50                FunctionNodeContent::Version2(content) => Ok(content),
51                _ => Err(anyhow!("Unexpected node type")),
52            },
53            _ => Err(anyhow!("Unexpected node type")),
54        }?;
55        let start = std::time::Instant::now();
56
57        let module_name = self
58            .function
59            .suggest_module_name(request.node.id.as_str(), &content.source);
60        let interrupt_handler = Box::new(move || start.elapsed() > MAX_DURATION);
61        self.function
62            .runtime()
63            .set_interrupt_handler(Some(interrupt_handler))
64            .await;
65
66        self.attach_globals()
67            .await
68            .map_err(|e| anyhow!(e.to_string()))?;
69
70        self.function
71            .register_module(&module_name, content.source.as_str())
72            .await
73            .map_err(|e| anyhow!(e.to_string()))?;
74
75        let response_result = self
76            .function
77            .call_handler(&module_name, JsValue(request.input.clone()))
78            .await;
79
80        match response_result {
81            Ok(response) => {
82                self.function.runtime().set_interrupt_handler(None).await;
83
84                Ok(NodeResponse {
85                    output: response.data,
86                    trace_data: self.trace.then(|| json!({ "log": response.logs })),
87                })
88            }
89            Err(e) => {
90                let mut log = self.function.extract_logs().await;
91                log.push(Log {
92                    lines: vec![json!(e.to_string()).to_string()],
93                    ms_since_run: start.elapsed().as_millis() as usize,
94                });
95
96                Err(anyhow!(PartialTraceError {
97                    message: e.to_string(),
98                    trace: Some(json!({ "log": log })),
99                }))
100            }
101        }
102    }
103
104    async fn attach_globals(&self) -> FunctionResult {
105        async_with!(self.function.context() => |ctx| {
106            let config = Object::new(ctx.clone()).catch(&ctx)?;
107
108            config.prop("iteration", self.iteration).catch(&ctx)?;
109            config.prop("maxDepth", self.max_depth).catch(&ctx)?;
110            config.prop("trace", self.trace).catch(&ctx)?;
111
112            ctx.globals().set("config", config).catch(&ctx)?;
113
114            Ok(())
115        })
116        .await
117    }
118}