zen_engine/handler/function/
mod.rs1use std::rc::Rc;
2use std::time::Duration;
3
4use ::serde::{Deserialize, Serialize};
5use anyhow::anyhow;
6use rquickjs::{async_with, CatchResultExt, Object};
7use serde_json::json;
8
9use crate::handler::function::error::FunctionResult;
10use crate::handler::function::function::{Function, HandlerResponse};
11use crate::handler::function::module::console::Log;
12use crate::handler::function::serde::JsValue;
13use crate::handler::node::{NodeRequest, NodeResponse, NodeResult, PartialTraceError};
14use crate::model::{DecisionNodeKind, FunctionNodeContent};
15
16pub(crate) mod error;
17pub(crate) mod function;
18pub(crate) mod listener;
19pub(crate) mod module;
20pub(crate) mod serde;
21
22#[derive(Serialize, Deserialize)]
23pub struct FunctionResponse {
24 performance: String,
25 data: Option<HandlerResponse>,
26}
27
28pub struct FunctionHandler {
29 function: Rc<Function>,
30 trace: bool,
31 iteration: u8,
32 max_depth: u8,
33}
34
35static MAX_DURATION: Duration = Duration::from_millis(5_000);
36
37impl FunctionHandler {
38 pub fn new(function: Rc<Function>, trace: bool, iteration: u8, max_depth: u8) -> Self {
39 Self {
40 function,
41 trace,
42 iteration,
43 max_depth,
44 }
45 }
46
47 pub async fn handle(&self, request: NodeRequest) -> NodeResult {
48 let content = match &request.node.kind {
49 DecisionNodeKind::FunctionNode { content } => match content {
50 FunctionNodeContent::Version2(content) => Ok(content),
51 _ => Err(anyhow!("Unexpected node type")),
52 },
53 _ => Err(anyhow!("Unexpected node type")),
54 }?;
55 let start = std::time::Instant::now();
56
57 let module_name = self
58 .function
59 .suggest_module_name(request.node.id.as_str(), &content.source);
60 let interrupt_handler = Box::new(move || start.elapsed() > MAX_DURATION);
61 self.function
62 .runtime()
63 .set_interrupt_handler(Some(interrupt_handler))
64 .await;
65
66 self.attach_globals()
67 .await
68 .map_err(|e| anyhow!(e.to_string()))?;
69
70 self.function
71 .register_module(&module_name, content.source.as_str())
72 .await
73 .map_err(|e| anyhow!(e.to_string()))?;
74
75 let response_result = self
76 .function
77 .call_handler(&module_name, JsValue(request.input.clone()))
78 .await;
79
80 match response_result {
81 Ok(response) => {
82 self.function.runtime().set_interrupt_handler(None).await;
83
84 Ok(NodeResponse {
85 output: response.data,
86 trace_data: self.trace.then(|| json!({ "log": response.logs })),
87 })
88 }
89 Err(e) => {
90 let mut log = self.function.extract_logs().await;
91 log.push(Log {
92 lines: vec![json!(e.to_string()).to_string()],
93 ms_since_run: start.elapsed().as_millis() as usize,
94 });
95
96 Err(anyhow!(PartialTraceError {
97 message: e.to_string(),
98 trace: Some(json!({ "log": log })),
99 }))
100 }
101 }
102 }
103
104 async fn attach_globals(&self) -> FunctionResult {
105 async_with!(self.function.context() => |ctx| {
106 let config = Object::new(ctx.clone()).catch(&ctx)?;
107
108 config.prop("iteration", self.iteration).catch(&ctx)?;
109 config.prop("maxDepth", self.max_depth).catch(&ctx)?;
110 config.prop("trace", self.trace).catch(&ctx)?;
111
112 ctx.globals().set("config", config).catch(&ctx)?;
113
114 Ok(())
115 })
116 .await
117 }
118}