1use std::path::PathBuf;
2
3use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use serde::Deserialize;
6use serde_json::Value;
7
8use crate::config::{parse_config, redact_secret_path};
9use crate::envelope::Envelope;
10use crate::pipeline::ErrorPolicy;
11use crate::transforms::{BasicTransform, MapOne, Transform};
12
13pub mod lua;
14pub mod python;
15pub mod rhai;
16
17#[async_trait]
18trait ScriptEngine: Send + Sync {
19 async fn run(&self, env: Envelope) -> Result<Option<Envelope>>;
20}
21
22struct ScriptMapOne<E: ScriptEngine> {
23 id: String,
24 engine: E,
25}
26
27impl<E: ScriptEngine> ScriptMapOne<E> {
28 fn new(id: impl Into<String>, engine: E) -> Self {
29 Self {
30 id: id.into(),
31 engine,
32 }
33 }
34}
35
36#[async_trait]
37impl<E: ScriptEngine> MapOne for ScriptMapOne<E> {
38 fn id(&self) -> &str {
39 &self.id
40 }
41
42 async fn map(&self, env: Envelope) -> Result<Option<Envelope>> {
43 self.engine.run(env).await
44 }
45}
46
47#[derive(Debug, Clone, Deserialize)]
48#[serde(rename_all = "snake_case")]
49enum ScriptRuntime {
50 Rhai,
51 Lua,
52 Python,
53}
54
55#[derive(Debug, Clone, Deserialize)]
56struct RawScriptTransformConfig {
57 runtime: ScriptRuntime,
58 #[serde(default)]
59 script: Option<String>,
60 #[serde(default)]
61 script_file: Option<PathBuf>,
62 #[serde(default = "default_entrypoint")]
63 entrypoint: String,
64 #[serde(default)]
65 python_bin: Option<String>,
66 #[serde(default)]
67 max_operations: Option<u64>,
68 #[serde(default)]
69 max_call_levels: Option<usize>,
70 #[serde(default)]
71 max_expr_depth: Option<usize>,
72 #[serde(default)]
73 max_function_expr_depth: Option<usize>,
74 #[serde(default)]
75 max_variables: Option<usize>,
76}
77
78#[derive(Debug, Clone)]
79struct PythonConfig {
80 bin: String,
81}
82
83fn default_python_bin() -> String {
84 "python3".into()
85}
86
87fn resolve_python_bin(bin: Option<String>) -> String {
88 match bin {
89 Some(bin) if !bin.trim().is_empty() => bin,
90 _ => default_python_bin(),
91 }
92}
93
94#[derive(Debug, Clone)]
95struct RhaiConfig {
96 max_operations: u64,
97 max_call_levels: usize,
98 max_expr_depth: usize,
99 max_function_expr_depth: usize,
100 max_variables: usize,
101}
102
103#[derive(Debug, Clone)]
104struct ScriptTransformConfig {
105 runtime: ScriptRuntime,
106 script: String,
107 entrypoint: String,
108 python: Option<PythonConfig>,
109 rhai: Option<RhaiConfig>,
110}
111
112impl RawScriptTransformConfig {
113 fn resolve(self) -> Result<ScriptTransformConfig> {
114 let RawScriptTransformConfig {
115 runtime,
116 script,
117 script_file,
118 entrypoint,
119 python_bin,
120 max_operations,
121 max_call_levels,
122 max_expr_depth,
123 max_function_expr_depth,
124 max_variables,
125 } = self;
126
127 let script = match (script, script_file) {
128 (Some(_), Some(_)) => {
129 bail!("script transform: set either 'script' or 'script_file', not both")
130 }
131 (None, None) => {
132 bail!("script transform: one of 'script' or 'script_file' is required")
133 }
134 (Some(script), None) => script,
135 (None, Some(path)) => std::fs::read_to_string(&path).with_context(|| {
136 format!("failed to read script_file '{}'", redact_secret_path(&path))
137 })?,
138 };
139
140 let has_rhai_limits = max_operations.is_some()
141 || max_call_levels.is_some()
142 || max_expr_depth.is_some()
143 || max_function_expr_depth.is_some()
144 || max_variables.is_some();
145
146 let python = match runtime {
147 ScriptRuntime::Python => {
148 if has_rhai_limits {
149 bail!(
150 "script transform: Rhai-only limits (max_operations, max_call_levels, max_expr_depth, max_function_expr_depth, max_variables) are not supported for runtime 'python'"
151 );
152 }
153 Some(PythonConfig {
154 bin: resolve_python_bin(python_bin),
155 })
156 }
157 ScriptRuntime::Lua => {
158 if has_rhai_limits {
159 bail!(
160 "script transform: Rhai-only limits (max_operations, max_call_levels, max_expr_depth, max_function_expr_depth, max_variables) are not supported for runtime 'lua'"
161 );
162 }
163 if python_bin.is_some() {
164 bail!("script transform: 'python_bin' is only supported for runtime 'python'");
165 }
166 None
167 }
168 ScriptRuntime::Rhai => {
169 if python_bin.is_some() {
170 bail!("script transform: 'python_bin' is only supported for runtime 'python'");
171 }
172 None
173 }
174 };
175
176 let rhai = match runtime {
177 ScriptRuntime::Rhai => Some(RhaiConfig {
178 max_operations: max_operations.unwrap_or_else(default_max_operations),
179 max_call_levels: max_call_levels.unwrap_or_else(default_max_call_levels),
180 max_expr_depth: max_expr_depth.unwrap_or_else(default_max_expr_depth),
181 max_function_expr_depth: max_function_expr_depth
182 .unwrap_or_else(default_max_function_expr_depth),
183 max_variables: max_variables.unwrap_or_else(default_max_variables),
184 }),
185 ScriptRuntime::Lua | ScriptRuntime::Python => None,
186 };
187
188 Ok(ScriptTransformConfig {
189 runtime,
190 script,
191 entrypoint,
192 python,
193 rhai,
194 })
195 }
196}
197
198fn default_entrypoint() -> String {
199 "transform".into()
200}
201
202fn default_max_operations() -> u64 {
203 100_000
204}
205
206fn default_max_call_levels() -> usize {
207 32
208}
209
210fn default_max_expr_depth() -> usize {
211 64
212}
213
214fn default_max_function_expr_depth() -> usize {
215 32
216}
217
218fn default_max_variables() -> usize {
219 64
220}
221
222pub fn script_transform_factory(
225 id: &str,
226 config: Value,
227 on_error: ErrorPolicy,
228) -> Result<Box<dyn Transform>> {
229 let config: RawScriptTransformConfig = parse_config("script", config)?;
230 let config = config.resolve()?;
231
232 let transform: Box<dyn Transform> = match config.runtime {
233 ScriptRuntime::Rhai => Box::new(
234 BasicTransform::new(ScriptMapOne::new(id, rhai::RhaiEngine::new(&config)?))
235 .with_error_policy(on_error),
236 ),
237 ScriptRuntime::Lua => Box::new(
238 BasicTransform::new(ScriptMapOne::new(id, lua::LuaEngine::new(&config)?))
239 .with_error_policy(on_error),
240 ),
241 ScriptRuntime::Python => Box::new(
242 BasicTransform::new(ScriptMapOne::new(id, python::PythonEngine::new(&config)?))
243 .with_error_policy(on_error),
244 ),
245 };
246
247 Ok(transform)
248}
249
250#[cfg(test)]
251mod tests {
252 use serde_json::json;
253
254 use crate::Registry;
255 use crate::config::{ErrorPolicyConfig, TransformSpec};
256
257 #[test]
258 fn factory_resolves_through_registry() {
259 let registry = Registry::with_builtins().unwrap();
260 registry
261 .build_transform(
262 "p/t0",
263 TransformSpec {
264 kind: "script".into(),
265 config: json!({
266 "runtime": "rhai",
267 "script": "fn transform(env) { env }",
268 }),
269 on_error: Some(ErrorPolicyConfig::Drop),
270 },
271 )
272 .unwrap();
273 }
274
275 #[test]
276 fn factory_loads_script_from_file() {
277 let dir = tempfile::tempdir().unwrap();
278 let path = dir.path().join("transform.rhai");
279 std::fs::write(&path, "fn transform(env) { env }").unwrap();
280
281 let registry = Registry::with_builtins().unwrap();
282 registry
283 .build_transform(
284 "p/t0",
285 TransformSpec {
286 kind: "script".into(),
287 config: json!({
288 "runtime": "rhai",
289 "script_file": path,
290 }),
291 on_error: None,
292 },
293 )
294 .unwrap();
295 }
296
297 #[test]
298 fn factory_loads_lua_script_from_file() {
299 let dir = tempfile::tempdir().unwrap();
300 let path = dir.path().join("transform.lua");
301 std::fs::write(&path, "function transform(env) return env end").unwrap();
302
303 let registry = Registry::with_builtins().unwrap();
304 registry
305 .build_transform(
306 "p/t0",
307 TransformSpec {
308 kind: "script".into(),
309 config: json!({
310 "runtime": "lua",
311 "script_file": path,
312 }),
313 on_error: None,
314 },
315 )
316 .unwrap();
317 }
318
319 #[test]
320 fn factory_rejects_missing_script_source() {
321 let registry = Registry::with_builtins().unwrap();
322 let err = registry
323 .build_transform(
324 "p/t0",
325 TransformSpec {
326 kind: "script".into(),
327 config: json!({ "runtime": "rhai" }),
328 on_error: None,
329 },
330 )
331 .err()
332 .expect("expected factory error");
333 let msg = format!("{err:#}");
334 assert!(
335 msg.contains("one of 'script' or 'script_file' is required"),
336 "{msg}"
337 );
338 }
339
340 #[test]
341 fn factory_rejects_both_script_and_script_file() {
342 let registry = Registry::with_builtins().unwrap();
343 let err = registry
344 .build_transform(
345 "p/t0",
346 TransformSpec {
347 kind: "script".into(),
348 config: json!({
349 "runtime": "rhai",
350 "script": "fn transform(env) { env }",
351 "script_file": "/tmp/does-not-matter.rhai",
352 }),
353 on_error: None,
354 },
355 )
356 .err()
357 .expect("expected factory error");
358 let msg = format!("{err:#}");
359 assert!(
360 msg.contains("set either 'script' or 'script_file', not both"),
361 "{msg}"
362 );
363 }
364
365 #[test]
366 fn factory_reports_missing_script_file() {
367 let registry = Registry::with_builtins().unwrap();
368 let err = registry
369 .build_transform(
370 "p/t0",
371 TransformSpec {
372 kind: "script".into(),
373 config: json!({
374 "runtime": "rhai",
375 "script_file": "/nonexistent/script.rhai",
376 }),
377 on_error: None,
378 },
379 )
380 .err()
381 .expect("expected factory error");
382 let msg = format!("{err:#}");
383 assert!(msg.contains("/nonexistent/script.rhai"), "{msg}");
384 }
385
386 #[test]
387 fn factory_resolves_lua_through_registry() {
388 let registry = Registry::with_builtins().unwrap();
389 registry
390 .build_transform(
391 "p/t0",
392 TransformSpec {
393 kind: "script".into(),
394 config: json!({
395 "runtime": "lua",
396 "script": "function transform(env) return env end",
397 }),
398 on_error: Some(ErrorPolicyConfig::Drop),
399 },
400 )
401 .unwrap();
402 }
403
404 #[test]
405 fn factory_requires_runtime() {
406 let registry = Registry::with_builtins().unwrap();
407 let err = registry
408 .build_transform(
409 "p/t0",
410 TransformSpec {
411 kind: "script".into(),
412 config: json!({
413 "script": "fn transform(env) { env }",
414 }),
415 on_error: None,
416 },
417 )
418 .err()
419 .expect("expected missing runtime error");
420 let msg = format!("{err:#}");
421 assert!(
422 msg.contains("invalid config for component type 'script'"),
423 "{msg}"
424 );
425 assert!(msg.contains("runtime"), "{msg}");
426 }
427
428 #[test]
429 fn factory_rejects_rhai_limits_for_lua() {
430 let registry = Registry::with_builtins().unwrap();
431 let err = registry
432 .build_transform(
433 "p/t0",
434 TransformSpec {
435 kind: "script".into(),
436 config: json!({
437 "runtime": "lua",
438 "script": "function transform(env) return env end",
439 "max_operations": 1,
440 }),
441 on_error: None,
442 },
443 )
444 .err()
445 .expect("expected Lua Rhai-limit validation error");
446 let msg = format!("{err:#}");
447 assert!(
448 msg.contains("Rhai-only limits") && msg.contains("runtime 'lua'"),
449 "{msg}"
450 );
451 }
452
453 #[test]
454 fn factory_resolves_python_through_registry() {
455 let registry = Registry::with_builtins().unwrap();
456 registry
457 .build_transform(
458 "p/t0",
459 TransformSpec {
460 kind: "script".into(),
461 config: json!({
462 "runtime": "python",
463 "script": "def transform(env):\n return env\n",
464 }),
465 on_error: Some(ErrorPolicyConfig::Drop),
466 },
467 )
468 .unwrap();
469 }
470
471 #[test]
472 fn factory_loads_python_script_from_file() {
473 let dir = tempfile::tempdir().unwrap();
474 let path = dir.path().join("transform.py");
475 std::fs::write(&path, "def transform(env):\n return env\n").unwrap();
476
477 let registry = Registry::with_builtins().unwrap();
478 registry
479 .build_transform(
480 "p/t0",
481 TransformSpec {
482 kind: "script".into(),
483 config: json!({
484 "runtime": "python",
485 "script_file": path,
486 }),
487 on_error: None,
488 },
489 )
490 .unwrap();
491 }
492
493 #[test]
494 fn factory_rejects_rhai_limits_for_python() {
495 let registry = Registry::with_builtins().unwrap();
496 let err = registry
497 .build_transform(
498 "p/t0",
499 TransformSpec {
500 kind: "script".into(),
501 config: json!({
502 "runtime": "python",
503 "script": "def transform(env):\n return env\n",
504 "max_variables": 1,
505 }),
506 on_error: None,
507 },
508 )
509 .err()
510 .expect("expected Python Rhai-limit validation error");
511 let msg = format!("{err:#}");
512 assert!(
513 msg.contains("Rhai-only limits") && msg.contains("runtime 'python'"),
514 "{msg}"
515 );
516 }
517
518 #[test]
519 fn factory_rejects_python_bin_for_non_python_runtime() {
520 let registry = Registry::with_builtins().unwrap();
521 let err = registry
522 .build_transform(
523 "p/t0",
524 TransformSpec {
525 kind: "script".into(),
526 config: json!({
527 "runtime": "lua",
528 "script": "function transform(env) return env end",
529 "python_bin": "python3",
530 }),
531 on_error: None,
532 },
533 )
534 .err()
535 .expect("expected python_bin validation error");
536 let msg = format!("{err:#}");
537 assert!(
538 msg.contains("'python_bin' is only supported for runtime 'python'"),
539 "{msg}"
540 );
541 }
542
543 #[test]
544 fn factory_reports_invalid_runtime() {
545 let registry = Registry::with_builtins().unwrap();
546 let result = registry.build_transform(
547 "p/t0",
548 TransformSpec {
549 kind: "script".into(),
550 config: json!({
551 "runtime": "bogus",
552 "script": "fn transform(env) { env }",
553 }),
554 on_error: None,
555 },
556 );
557 let err = result.err().expect("expected invalid runtime error");
558 let msg = format!("{err:#}");
559 assert!(
560 msg.contains("invalid config for component type 'script'"),
561 "{msg}"
562 );
563 }
564}