mf_engine/handler/function/
mod.rs1use std::rc::Rc;
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4
5use ::serde::{Deserialize, Serialize};
6use anyhow::anyhow;
7use rquickjs::{async_with, CatchResultExt, Object};
8use serde_json::json;
9
10use crate::handler::function::error::FunctionResult;
11use crate::handler::function::function::{Function, HandlerResponse};
12use crate::handler::function::module::console::Log;
13use crate::handler::function::serde::JsValue;
14use crate::handler::node::{
15 NodeRequest, NodeResponse, NodeResult, PartialTraceError,
16};
17use crate::model::{DecisionNodeKind, FunctionNodeContent};
18use crate::ZEN_CONFIG;
19
20pub(crate) mod error;
21pub(crate) mod function;
22pub(crate) mod listener;
23pub(crate) mod module;
24pub(crate) mod serde;
25
26#[derive(Serialize, Deserialize)]
27pub struct FunctionResponse {
28 performance: String,
29 data: Option<HandlerResponse>,
30}
31
32pub struct FunctionHandler {
33 function: Rc<Function>,
34 trace: bool,
35 iteration: u8,
36 max_depth: u8,
37 max_duration: Duration,
38}
39
40impl FunctionHandler {
41 pub fn new(
42 function: Rc<Function>,
43 trace: bool,
44 iteration: u8,
45 max_depth: u8,
46 ) -> Self {
47 let max_duration_millis =
48 ZEN_CONFIG.function_timeout_millis.load(Ordering::Relaxed);
49
50 Self {
51 function,
52 trace,
53 iteration,
54 max_depth,
55 max_duration: Duration::from_millis(max_duration_millis),
56 }
57 }
58
59 pub async fn handle(
60 &self,
61 request: NodeRequest,
62 ) -> NodeResult {
63 let content = match &request.node.kind {
64 DecisionNodeKind::FunctionNode { content } => match content {
65 FunctionNodeContent::Version2(content) => Ok(content),
66 _ => Err(anyhow!("Unexpected node type")),
67 },
68 _ => Err(anyhow!("Unexpected node type")),
69 }?;
70
71 let start = std::time::Instant::now();
72 if content.omit_nodes {
73 request.input.dot_remove("$nodes");
74 }
75
76 let module_name = self
77 .function
78 .suggest_module_name(request.node.id.as_str(), &content.source);
79
80 let max_duration = self.max_duration.clone();
81 let interrupt_handler =
82 Box::new(move || start.elapsed() > max_duration);
83 self.function
84 .runtime()
85 .set_interrupt_handler(Some(interrupt_handler))
86 .await;
87
88 self.attach_globals().await.map_err(|e| anyhow!(e.to_string()))?;
89
90 self.function
91 .register_module(&module_name, content.source.as_str())
92 .await
93 .map_err(|e| anyhow!(e.to_string()))?;
94
95 let response_result = self
96 .function
97 .call_handler(&module_name, JsValue(request.input.clone()))
98 .await;
99
100 match response_result {
101 Ok(response) => {
102 self.function.runtime().set_interrupt_handler(None).await;
103
104 Ok(NodeResponse {
105 output: response.data,
106 trace_data: self
107 .trace
108 .then(|| json!({ "log": response.logs })),
109 })
110 },
111 Err(e) => {
112 let mut log = self.function.extract_logs().await;
113 log.push(Log {
114 lines: vec![json!(e.to_string()).to_string()],
115 ms_since_run: start.elapsed().as_millis() as usize,
116 });
117
118 Err(anyhow!(PartialTraceError {
119 message: e.to_string(),
120 trace: Some(json!({ "log": log })),
121 }))
122 },
123 }
124 }
125
126 async fn attach_globals(&self) -> FunctionResult {
127 async_with!(self.function.context() => |ctx| {
128 let config = Object::new(ctx.clone()).catch(&ctx)?;
129
130 config.prop("iteration", self.iteration).catch(&ctx)?;
131 config.prop("maxDepth", self.max_depth).catch(&ctx)?;
132 config.prop("trace", self.trace).catch(&ctx)?;
133
134 ctx.globals().set("config", config).catch(&ctx)?;
135
136 Ok(())
137 })
138 .await
139 }
140}