1use super::{
56 ImportPolicy, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
57 WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll, WorkflowRuntimeRequest,
58 WorkflowRuntimeRequestResolution,
59};
60use anyhow::{anyhow, bail, Context as AnyhowContext};
61use rquickjs::{
62 context::intrinsic,
63 loader::{Loader, Resolver},
64 module::Declared,
65 object::{Accessor, Property},
66 prelude::{Func, MutFn, Opt, Rest},
67 promise::PromiseState,
68 CatchResultExt, CaughtError, Context, Error as RQuickJSError, Exception, Function, Module,
69 Object, Persistent, Promise, Runtime, Undefined, Value,
70};
71use std::{
72 cell::RefCell,
73 collections::{HashMap, VecDeque},
74 rc::Rc,
75 sync::{Arc, Mutex},
76 time::{Duration, Instant},
77};
78
79type WorkflowIntrinsics = (
80 intrinsic::Eval,
81 intrinsic::Json,
82 intrinsic::Promise,
83 intrinsic::Proxy,
84 intrinsic::MapSet,
85 intrinsic::RegExp,
86);
87
88const WORKFLOW_EXTRA_MODULE: &str = "workflow:extra";
89const WORKFLOW_SANDBOX_MODULE: &str = "workflow:sandbox";
90const DEFAULT_MAX_SLEEP_MS: u64 = 365 * 24 * 60 * 60 * 1000;
94
95const BLOCKED_GLOBALS: &[&str] = &[
96 "eval",
97 "Function",
98 "AsyncFunction",
99 "Date",
100 "fetch",
101 "XMLHttpRequest",
102 "WebSocket",
103 "EventSource",
104 "navigator",
105 "location",
106 "Deno",
107 "Bun",
108 "process",
109 "require",
110 "Buffer",
111 "__dirname",
112 "__filename",
113];
114
115const INTERNAL_GLOBALS: &[&str] = &["__readonly"];
116
117#[derive(Debug)]
118struct WorkflowModuleResolver;
119
120impl Resolver for WorkflowModuleResolver {
121 fn resolve<'js>(
122 &mut self,
123 _ctx: &rquickjs::Ctx<'js>,
124 base: &str,
125 name: &str,
126 ) -> rquickjs::Result<String> {
127 match name {
128 WORKFLOW_EXTRA_MODULE => Ok(WORKFLOW_EXTRA_MODULE.to_string()),
129 WORKFLOW_SANDBOX_MODULE => Ok(WORKFLOW_SANDBOX_MODULE.to_string()),
130 _ => Err(RQuickJSError::new_resolving_message(
131 base,
132 name,
133 "workflow imports are restricted; only workflow:extra and workflow:sandbox are available",
134 )),
135 }
136 }
137}
138
139#[derive(Debug)]
140struct WorkflowModuleLoader;
141
142impl Loader for WorkflowModuleLoader {
143 fn load<'js>(
144 &mut self,
145 ctx: &rquickjs::Ctx<'js>,
146 name: &str,
147 ) -> rquickjs::Result<Module<'js, Declared>> {
148 match name {
149 WORKFLOW_EXTRA_MODULE => Module::declare(
150 ctx.clone(),
151 WORKFLOW_EXTRA_MODULE,
152 include_str!("assets/workflow_extra.js"),
153 ),
154 WORKFLOW_SANDBOX_MODULE => Module::declare(
155 ctx.clone(),
156 WORKFLOW_SANDBOX_MODULE,
157 include_str!("assets/workflow_sandbox.js"),
158 ),
159 _ => Err(RQuickJSError::new_loading_message(
160 name,
161 "workflow imports are restricted; only workflow:extra and workflow:sandbox are available",
162 )),
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
169pub struct RQuickJSWorkflowRuntime {
170 max_sleep_ms: u64,
171}
172
173impl Default for RQuickJSWorkflowRuntime {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179impl RQuickJSWorkflowRuntime {
180 pub fn new() -> Self {
181 Self {
182 max_sleep_ms: DEFAULT_MAX_SLEEP_MS,
183 }
184 }
185
186 pub fn with_max_sleep_ms(mut self, max_sleep_ms: u64) -> Self {
187 self.max_sleep_ms = max_sleep_ms;
188 self
189 }
190}
191
192impl WorkflowJSRuntime for RQuickJSWorkflowRuntime {
193 fn start_module(
194 &self,
195 input: WorkflowModuleInput,
196 ) -> anyhow::Result<Box<dyn WorkflowRuntimeExecution>> {
197 log::debug!(
198 "quickjs start_module source={} args_type={} budget_total={:?} budget_spent={}",
199 input.source_name,
200 json_value_type(&input.args),
201 input.budget.total,
202 input.budget.spent
203 );
204 if input.sandbox.import_policy != ImportPolicy::DenyAll {
205 bail!("unsupported workflow import policy");
206 }
207
208 let runtime = Runtime::new().context("failed to create QuickJS runtime")?;
209 runtime.set_memory_limit(input.sandbox.memory_limit_bytes);
210 runtime.set_max_stack_size(input.sandbox.max_stack_size_bytes);
211 runtime.set_loader(WorkflowModuleResolver, WorkflowModuleLoader);
212
213 let timeout = input.sandbox.timeout;
214 let deadline = Arc::new(Mutex::new(Instant::now() + timeout));
215 let interrupt_deadline = Arc::clone(&deadline);
216 runtime.set_interrupt_handler(Some(Box::new(move || match interrupt_deadline.lock() {
217 Ok(deadline) => Instant::now() >= *deadline,
218 Err(_) => true,
219 })));
220
221 let context = Context::custom::<WorkflowIntrinsics>(&runtime)
222 .context("failed to create restricted QuickJS context")?;
223
224 let mut execution = RQuickJSWorkflowExecution {
225 state: Rc::new(RefCell::new(RuntimeState {
226 max_sleep_ms: self.max_sleep_ms,
227 ..RuntimeState::default()
228 })),
229 module_namespace: None,
230 module_eval_promise: None,
231 workflow_promise: None,
232 readonly: None,
233 context,
234 runtime,
235 deadline,
236 timeout,
237 };
238 execution.start(input)?;
239 Ok(Box::new(execution))
240 }
241}
242
243struct RQuickJSWorkflowExecution {
244 state: Rc<RefCell<RuntimeState>>,
246 module_namespace: Option<Persistent<Object<'static>>>,
247 module_eval_promise: Option<Persistent<Promise<'static>>>,
248 workflow_promise: Option<Persistent<Promise<'static>>>,
249 readonly: Option<Persistent<Function<'static>>>,
250 context: Context,
251 #[allow(dead_code)]
252 runtime: Runtime,
253 deadline: Arc<Mutex<Instant>>,
254 timeout: Duration,
255}
256
257#[derive(Default)]
258struct RuntimeState {
259 calls: VecDeque<WorkflowRuntimeCall>,
260 requests: VecDeque<WorkflowRuntimeRequest>,
261 pending_requests: HashMap<String, PendingRequest>,
262 next_request_id: u64,
263 current_phase: Option<String>,
264 budget: super::WorkflowBudgetSnapshot,
265 max_sleep_ms: u64,
266}
267
268#[derive(Clone)]
269struct PendingRequest {
270 resolve: Persistent<Function<'static>>,
271 reject: Persistent<Function<'static>>,
272}
273
274impl RQuickJSWorkflowExecution {
275 fn start(&mut self, input: WorkflowModuleInput) -> anyhow::Result<()> {
276 let context = self.context.clone();
277 context.with(|ctx| -> anyhow::Result<()> {
278 evaluate_sandbox_prelude(&ctx)?;
279
280 let RuntimeGlobals {
281 source_name,
282 source,
283 readonly,
284 } = install_runtime_globals(&ctx, input, Rc::clone(&self.state))?;
285 self.readonly = Some(readonly);
286 self.evaluate_module(&ctx, source_name, source)?;
287 Ok(())
288 })
289 }
290
291 fn evaluate_module(
292 &mut self,
293 ctx: &rquickjs::Ctx<'_>,
294 source_name: String,
295 source: String,
296 ) -> anyhow::Result<()> {
297 log::debug!("quickjs evaluate_module source={source_name}");
298 let module = Module::declare(ctx.clone(), source_name, source)
299 .catch(ctx)
300 .map_err(|error| anyhow!("failed to declare workflow module: {error:?}"))?;
301 let (module, promise) = module
302 .eval()
303 .catch(ctx)
304 .map_err(|error| anyhow!("failed to evaluate workflow module: {error:?}"))?;
305 let namespace = module
306 .namespace()
307 .context("failed to get workflow module namespace")?;
308
309 self.module_namespace = Some(Persistent::save(ctx, namespace));
310 self.module_eval_promise = Some(Persistent::save(ctx, promise));
311 Ok(())
312 }
313
314 fn refresh_deadline(&self) -> anyhow::Result<()> {
315 let mut deadline = self
316 .deadline
317 .lock()
318 .map_err(|_| anyhow!("QuickJS interrupt deadline lock was poisoned"))?;
319 *deadline = Instant::now() + self.timeout;
320 Ok(())
321 }
322
323 fn drain_jobs(&self) -> anyhow::Result<()> {
324 self.refresh_deadline()?;
325 self.context.with(|ctx| while ctx.execute_pending_job() {});
326 Ok(())
327 }
328}
329
330impl WorkflowRuntimeExecution for RQuickJSWorkflowExecution {
331 fn poll(&mut self) -> anyhow::Result<WorkflowRuntimePoll> {
332 self.drain_jobs()?;
333
334 let context = self.context.clone();
335 context.with(|ctx| -> anyhow::Result<WorkflowRuntimePoll> {
336 if let Some(call) = self.state.borrow_mut().calls.pop_front() {
337 return Ok(WorkflowRuntimePoll::Call(call));
338 }
339
340 if let Some(request) = self.state.borrow().requests.front().cloned() {
341 return Ok(WorkflowRuntimePoll::Request(request));
342 }
343
344 if self.workflow_promise.is_none() {
345 match self.module_eval_state(&ctx)? {
346 PromiseState::Pending => return Ok(WorkflowRuntimePoll::Pending),
347 PromiseState::Rejected => {
348 bail!(
349 "workflow module evaluation rejected: {}",
350 self.module_eval_rejection_message(&ctx)
351 )
352 }
353 PromiseState::Resolved => self.start_default_export(&ctx)?,
354 }
355 }
356
357 self.poll_workflow_promise(&ctx)
358 })
359 }
360
361 fn take_pending_requests(&mut self) -> anyhow::Result<Vec<WorkflowRuntimeRequest>> {
362 self.drain_jobs()?;
363 Ok(self.state.borrow_mut().requests.drain(..).collect())
364 }
365
366 fn resolve_request(
367 &mut self,
368 id: &str,
369 resolution: WorkflowRuntimeRequestResolution,
370 ) -> anyhow::Result<()> {
371 let resolution_json = match resolution {
372 WorkflowRuntimeRequestResolution::Ok(value) => serde_json::json!({
373 "ok": true,
374 "value": value,
375 }),
376 WorkflowRuntimeRequestResolution::OkUndefined => serde_json::json!({
377 "ok": true,
378 "undefined": true,
379 }),
380 WorkflowRuntimeRequestResolution::OkWithBudget { value, budget } => {
381 self.state.borrow_mut().budget = budget;
382 serde_json::json!({
383 "ok": true,
384 "value": value,
385 })
386 }
387 WorkflowRuntimeRequestResolution::Err { message } => serde_json::json!({
388 "ok": false,
389 "message": message,
390 }),
391 };
392
393 self.refresh_deadline()?;
394 self.context.with(|ctx| -> anyhow::Result<()> {
395 let pending = self
396 .state
397 .borrow()
398 .pending_requests
399 .get(id)
400 .cloned()
401 .ok_or_else(|| anyhow!("unknown workflow request id: {id}"))?;
402 let resolution = rquickjs_serde::to_value(ctx.clone(), &resolution_json)
403 .context("failed to convert workflow request resolution to QuickJS value")?;
404 let resolution_object: Object<'_> = resolution
405 .as_object()
406 .cloned()
407 .ok_or_else(|| anyhow!("request resolution was not an object"))?;
408 let ok = resolution_object
409 .get::<_, bool>("ok")
410 .context("failed to read request resolution status")?;
411
412 let resolved = if ok {
413 let value: Value<'_> = if resolution_object
414 .get::<_, bool>("undefined")
415 .unwrap_or(false)
416 {
417 Undefined.into_value(ctx.clone())
418 } else {
419 resolution_object
420 .get("value")
421 .context("failed to read request resolution value")?
422 };
423 let resolve = pending
424 .resolve
425 .restore(&ctx)
426 .context("failed to restore request resolver")?;
427 resolve
428 .call::<_, ()>((value,))
429 .catch(&ctx)
430 .map_err(|error| anyhow!("failed to resolve workflow request: {error:?}"))
431 } else {
432 let message = resolution_object
433 .get::<_, String>("message")
434 .unwrap_or_else(|_| "workflow request rejected".to_string());
435 let error_constructor: Function = ctx
436 .globals()
437 .get("Error")
438 .context("failed to get Error constructor")?;
439 let error_value: Value<'_> = error_constructor
440 .call((message,))
441 .catch(&ctx)
442 .map_err(|error| {
443 anyhow!("failed to construct request rejection error: {error:?}")
444 })?;
445 let reject = pending
446 .reject
447 .restore(&ctx)
448 .context("failed to restore request rejecter")?;
449 reject
450 .call::<_, ()>((error_value,))
451 .catch(&ctx)
452 .map_err(|error| anyhow!("failed to reject workflow request: {error:?}"))
453 };
454
455 if resolved.is_ok() {
456 let mut state = self.state.borrow_mut();
457 state.pending_requests.remove(id);
458 state.requests.retain(|request| request.id() != id);
459 }
460
461 resolved
462 })
463 }
464}
465
466impl RQuickJSWorkflowExecution {
467 fn module_eval_state(&self, ctx: &rquickjs::Ctx<'_>) -> anyhow::Result<PromiseState> {
468 let promise = self
469 .module_eval_promise
470 .clone()
471 .ok_or_else(|| anyhow!("workflow module evaluation was not started"))?
472 .restore(ctx)
473 .context("failed to restore workflow module evaluation promise")?;
474 Ok(promise.state())
475 }
476
477 fn module_eval_rejection_message(&self, ctx: &rquickjs::Ctx<'_>) -> String {
478 if let Some(promise) = self
479 .module_eval_promise
480 .clone()
481 .and_then(|promise| promise.restore(ctx).ok())
482 {
483 let _ = promise.result::<Value<'_>>();
484 }
485 js_exception_message(ctx)
486 }
487
488 fn start_default_export(&mut self, ctx: &rquickjs::Ctx<'_>) -> anyhow::Result<()> {
489 let namespace = self
490 .module_namespace
491 .clone()
492 .ok_or_else(|| anyhow!("workflow module namespace is missing"))?
493 .restore(ctx)
494 .context("failed to restore workflow module namespace")?;
495 if !namespace
496 .contains_key("default")
497 .context("failed to inspect workflow module default export")?
498 {
499 bail!("workflow module must default export a workflow result or function");
500 }
501 let default_export: Value<'_> = namespace
502 .get("default")
503 .context("workflow module must default export a workflow result or function")?;
504 let promise = start_default_export(ctx, default_export)
505 .context("failed to start workflow default export")?;
506 self.workflow_promise = Some(Persistent::save(ctx, promise));
507 Ok(())
508 }
509
510 fn poll_workflow_promise(
511 &self,
512 ctx: &rquickjs::Ctx<'_>,
513 ) -> anyhow::Result<WorkflowRuntimePoll> {
514 let promise = self
515 .workflow_promise
516 .clone()
517 .ok_or_else(|| anyhow!("workflow default execution was not started"))?
518 .restore(ctx)
519 .context("failed to restore workflow promise")?;
520
521 match promise.state() {
522 PromiseState::Pending => Ok(WorkflowRuntimePoll::Pending),
523 PromiseState::Rejected => {
524 let _ = promise.result::<Value<'_>>();
525 bail!("workflow module rejected: {}", js_exception_message(ctx))
526 }
527 PromiseState::Resolved => {
528 let result = promise
529 .result::<Value<'_>>()
530 .ok_or_else(|| anyhow!("workflow promise resolved without a result"))?
531 .catch(ctx)
532 .map_err(|error| anyhow!("failed to read workflow result: {error:?}"))?;
533 let result = rquickjs_serde::from_value::<serde_json::Value>(result)
534 .context("failed to convert workflow result from QuickJS value")?;
535 Ok(WorkflowRuntimePoll::Complete(WorkflowModuleOutput {
536 result,
537 }))
538 }
539 }
540 }
541}
542
543fn json_value_type(value: &serde_json::Value) -> &'static str {
544 match value {
545 serde_json::Value::Null => "null",
546 serde_json::Value::Bool(_) => "boolean",
547 serde_json::Value::Number(_) => "number",
548 serde_json::Value::String(_) => "string",
549 serde_json::Value::Array(_) => "array",
550 serde_json::Value::Object(_) => "object",
551 }
552}
553
554fn evaluate_sandbox_prelude(ctx: &rquickjs::Ctx<'_>) -> anyhow::Result<()> {
555 let module = Module::declare(
556 ctx.clone(),
557 "smol:workflow-sandbox-prelude".to_string(),
558 include_str!("assets/sandbox_prelude.js").to_string(),
559 )
560 .catch(ctx)
561 .map_err(|error| anyhow!("failed to declare sandbox prelude: {error:?}"))?;
562 let (_module, promise) = module
563 .eval()
564 .catch(ctx)
565 .map_err(|error| anyhow!("failed to evaluate sandbox prelude: {error:?}"))?;
566
567 while promise.state() == PromiseState::Pending {
568 if !ctx.execute_pending_job() {
569 bail!("sandbox prelude did not complete");
570 }
571 }
572
573 if promise.state() == PromiseState::Rejected {
574 let _ = promise.result::<Value<'_>>();
575 bail!("sandbox prelude rejected: {}", js_exception_message(ctx));
576 }
577
578 Ok(())
579}
580
581fn js_exception_message(ctx: &rquickjs::Ctx<'_>) -> String {
582 let error = ctx.catch();
583 if let Some(object) = error.as_object() {
584 let message = object
585 .get::<_, String>("message")
586 .ok()
587 .filter(|message| !message.is_empty());
588 let stack = object
589 .get::<_, String>("stack")
590 .ok()
591 .filter(|stack| !stack.is_empty());
592
593 match (message, stack) {
594 (Some(message), Some(stack)) if stack.contains(&message) => return stack,
595 (Some(message), Some(stack)) => return format!("{message}\n{stack}"),
596 (Some(message), None) => return message,
597 (None, Some(stack)) => return stack,
598 (None, None) => {}
599 }
600 }
601
602 if let Ok(value) = rquickjs_serde::from_value::<serde_json::Value>(error.clone()) {
603 return match value {
604 serde_json::Value::String(message) if !message.is_empty() => message,
605 other => other.to_string(),
606 };
607 }
608
609 format!("{error:?}")
610}
611
612fn install_runtime_globals<'js>(
613 ctx: &rquickjs::Ctx<'js>,
614 input: WorkflowModuleInput,
615 state: Rc<RefCell<RuntimeState>>,
616) -> anyhow::Result<RuntimeGlobals> {
617 let globals = ctx.globals();
618
619 let WorkflowModuleInput {
620 source,
621 source_name,
622 args,
623 budget,
624 sandbox: _,
625 } = input;
626
627 state.borrow_mut().budget = budget;
628
629 let args = rquickjs_serde::to_value(ctx.clone(), &args)
630 .context("failed to convert workflow args to QuickJS value")?;
631 let readonly: Function = globals
632 .get("__readonly")
633 .context("failed to get readonly helper")?;
634 let readonly = Persistent::save(ctx, readonly);
635 let readonly_args =
636 readonly_proxy(ctx, &readonly, args).context("failed to wrap workflow args as readonly")?;
637 define_readonly_data_property(ctx, &globals, "args", readonly_args, true)
638 .context("failed to install readonly workflow args global")?;
639
640 let budget = create_budget_object(ctx, Rc::clone(&state))?;
641 let budget = readonly_proxy(ctx, &readonly, budget.into())
642 .context("failed to wrap workflow budget as readonly")?;
643 define_readonly_data_property(ctx, &globals, "budget", budget, true)
644 .context("failed to install workflow budget global")?;
645
646 install_native_workflow_functions(&globals, state)?;
647 harden_public_workflow_globals(ctx, &globals, &readonly)?;
648
649 harden_workflow_sandbox(ctx, &globals)?;
650 hide_internal_globals(&globals);
651
652 Ok(RuntimeGlobals {
653 source_name,
654 source,
655 readonly,
656 })
657}
658
659struct RuntimeGlobals {
660 source_name: String,
661 source: String,
662 readonly: Persistent<Function<'static>>,
663}
664
665fn start_default_export<'js>(
666 ctx: &rquickjs::Ctx<'js>,
667 default_export: Value<'js>,
668) -> anyhow::Result<Promise<'js>> {
669 let globals = ctx.globals();
670 let result = if let Some(default_function) = default_export.as_function().cloned() {
671 let args: Value<'js> = globals
672 .get("args")
673 .context("failed to get workflow args global")?;
674 let workflow_context = create_workflow_context_object(ctx, &globals)?;
675 default_function
676 .call::<_, Value<'js>>((args, workflow_context))
677 .catch(ctx)
678 } else {
679 Ok(default_export)
680 };
681
682 let (promise, resolve, reject) =
683 Promise::new(ctx).context("failed to create workflow promise")?;
684 match result {
685 Ok(value) => resolve
686 .call::<_, ()>((value,))
687 .catch(ctx)
688 .map_err(|error| anyhow!("failed to resolve workflow promise: {error:?}"))?,
689 Err(CaughtError::Exception(error)) => reject
690 .call::<_, ()>((error.into_value(),))
691 .catch(ctx)
692 .map_err(|error| anyhow!("failed to reject workflow promise: {error:?}"))?,
693 Err(CaughtError::Value(error)) => reject
694 .call::<_, ()>((error,))
695 .catch(ctx)
696 .map_err(|error| anyhow!("failed to reject workflow promise: {error:?}"))?,
697 Err(CaughtError::Error(error)) => {
698 return Err(anyhow!("failed to call workflow default export: {error:?}"));
699 }
700 }
701 Ok(promise)
702}
703
704fn create_workflow_context_object<'js>(
705 ctx: &rquickjs::Ctx<'js>,
706 globals: &Object<'js>,
707) -> anyhow::Result<Object<'js>> {
708 let workflow_context = Object::new(ctx.clone()).context("failed to create workflow context")?;
709 for name in [
710 "args", "agent", "parallel", "pipeline", "workflow", "budget", "log", "phase",
711 ] {
712 let value: Value<'js> = globals
713 .get(name)
714 .with_context(|| format!("failed to get workflow context value {name}"))?;
715 workflow_context
716 .prop(name, Property::from(value).enumerable())
717 .with_context(|| format!("failed to install workflow context value {name}"))?;
718 }
719
720 let sw: Object<'js> = globals
721 .get("SW")
722 .context("failed to get workflow context SW namespace")?;
723 let extra: Value<'js> = sw
724 .get("extra")
725 .context("failed to get workflow context extra namespace")?;
726 workflow_context
727 .prop("extra", Property::from(extra).enumerable())
728 .context("failed to install workflow context extra namespace")?;
729
730 Ok(workflow_context)
731}
732
733fn readonly_proxy<'js>(
734 ctx: &rquickjs::Ctx<'js>,
735 readonly: &Persistent<Function<'static>>,
736 value: Value<'js>,
737) -> anyhow::Result<Value<'js>> {
738 let readonly = readonly
739 .clone()
740 .restore(ctx)
741 .context("failed to restore readonly proxy helper")?;
742 readonly
743 .call((value,))
744 .catch(ctx)
745 .map_err(|error| anyhow!("failed to create readonly proxy: {error:?}"))
746}
747
748fn harden_public_workflow_globals<'js>(
749 ctx: &rquickjs::Ctx<'js>,
750 globals: &Object<'js>,
751 readonly: &Persistent<Function<'static>>,
752) -> anyhow::Result<()> {
753 for name in [
754 "agent", "workflow", "log", "phase", "parallel", "pipeline", "SW",
755 ] {
756 let value: Value<'js> = globals
757 .get(name)
758 .with_context(|| format!("failed to get workflow global {name}"))?;
759 let value = readonly_proxy(ctx, readonly, value)
760 .with_context(|| format!("failed to wrap workflow global {name} as readonly"))?;
761 define_readonly_data_property(ctx, globals, name, value, true)
762 .with_context(|| format!("failed to harden workflow global {name}"))?;
763 }
764 Ok(())
765}
766
767fn harden_workflow_sandbox<'js>(
768 ctx: &rquickjs::Ctx<'js>,
769 globals: &Object<'js>,
770) -> anyhow::Result<()> {
771 let math: Object<'_> = globals.get("Math").context("failed to get Math global")?;
772 let random = Function::new(
773 ctx.clone(),
774 |ctx: rquickjs::Ctx<'_>| -> rquickjs::Result<()> {
775 Err(Exception::throw_message(
776 &ctx,
777 "Math.random is disabled in smol workflow sandbox",
778 ))
779 },
780 )
781 .context("failed to create disabled Math.random function")?;
782 define_readonly_data_property(ctx, &math, "random", random.into_value(), false)
783 .context("failed to replace Math.random")?;
784
785 for name in BLOCKED_GLOBALS {
786 define_readonly_data_property(ctx, globals, name, Undefined.into_value(ctx.clone()), false)
787 .with_context(|| format!("failed to block workflow global {name}"))?;
788 }
789
790 Ok(())
791}
792
793fn hide_internal_globals<'js>(globals: &Object<'js>) {
794 for name in INTERNAL_GLOBALS {
795 let _ = define_readonly_data_property(
796 globals.ctx(),
797 globals,
798 name,
799 Undefined.into_value(globals.ctx().clone()),
800 false,
801 );
802 }
803}
804
805fn define_readonly_data_property<'js>(
806 ctx: &rquickjs::Ctx<'js>,
807 target: &Object<'js>,
808 name: &str,
809 value: Value<'js>,
810 enumerable: bool,
811) -> anyhow::Result<()> {
812 let descriptor = Object::new(ctx.clone()).context("failed to create property descriptor")?;
813 descriptor
814 .set("value", value)
815 .context("failed to set property descriptor value")?;
816 descriptor
817 .set("writable", false)
818 .context("failed to set property descriptor writable flag")?;
819 descriptor
820 .set("configurable", false)
821 .context("failed to set property descriptor configurable flag")?;
822 descriptor
823 .set("enumerable", enumerable)
824 .context("failed to set property descriptor enumerable flag")?;
825
826 let object: Object<'js> = ctx
827 .globals()
828 .get("Object")
829 .context("failed to get Object")?;
830 let define_property: Function<'js> = object
831 .get("defineProperty")
832 .context("failed to get Object.defineProperty")?;
833 define_property
834 .call::<_, ()>((target.clone(), name, descriptor))
835 .catch(ctx)
836 .map_err(|error| anyhow!("Object.defineProperty failed for {name}: {error:?}"))
837}
838
839fn create_budget_object<'js>(
840 ctx: &rquickjs::Ctx<'js>,
841 state: Rc<RefCell<RuntimeState>>,
842) -> anyhow::Result<Object<'js>> {
843 let object = Object::new(ctx.clone()).context("failed to create workflow budget object")?;
844
845 let total_state = Rc::clone(&state);
846 object
847 .prop(
848 "total",
849 Accessor::from(
850 move |ctx: rquickjs::Ctx<'js>| -> rquickjs::Result<Value<'js>> {
851 rquickjs_serde::to_value(ctx, total_state.borrow().budget.total).map_err(
852 |error| rquickjs::Error::IntoJs {
853 from: "WorkflowBudgetSnapshot.total",
854 to: "value",
855 message: Some(error.to_string()),
856 },
857 )
858 },
859 )
860 .enumerable(),
861 )
862 .context("failed to install workflow budget total")?;
863
864 let spent_state = Rc::clone(&state);
865 object
866 .prop(
867 "spent",
868 Property::from(Func::from(move || spent_state.borrow().budget.spent)).enumerable(),
869 )
870 .context("failed to install workflow budget spent function")?;
871
872 object
873 .prop(
874 "remaining",
875 Property::from(Func::from(move || {
876 let budget = &state.borrow().budget;
877 match budget.total {
878 Some(total) => total.saturating_sub(budget.spent) as f64,
879 None => f64::INFINITY,
880 }
881 }))
882 .enumerable(),
883 )
884 .context("failed to install workflow budget remaining function")?;
885
886 Ok(object)
887}
888
889fn install_native_workflow_functions<'js>(
890 globals: &Object<'js>,
891 state: Rc<RefCell<RuntimeState>>,
892) -> anyhow::Result<()> {
893 let log_state = Rc::clone(&state);
894 globals
895 .prop(
896 "log",
897 Property::from(Func::from(MutFn::from(move |values: Rest<Value<'js>>| {
898 let values = values
899 .0
900 .into_iter()
901 .map(rquickjs_serde::from_value::<serde_json::Value>)
902 .collect::<Result<Vec<_>, _>>()
903 .map_err(|error| rquickjs::Error::FromJs {
904 from: "value",
905 to: "serde_json::Value",
906 message: Some(error.to_string()),
907 })?;
908 log_state
909 .borrow_mut()
910 .calls
911 .push_back(WorkflowRuntimeCall::Log { values });
912 Ok::<(), rquickjs::Error>(())
913 })))
914 .enumerable()
915 .configurable(),
916 )
917 .context("failed to install workflow log global")?;
918
919 let phase_state = Rc::clone(&state);
920 globals
921 .prop(
922 "phase",
923 Property::from(Func::from(MutFn::from(
924 move |name: String, options: Opt<Value<'js>>| {
925 let options = match options.0 {
926 Some(value) => Some(
927 rquickjs_serde::from_value::<serde_json::Value>(value).map_err(
928 |error| rquickjs::Error::FromJs {
929 from: "value",
930 to: "serde_json::Value",
931 message: Some(error.to_string()),
932 },
933 )?,
934 ),
935 None => None,
936 };
937 let mut state = phase_state.borrow_mut();
938 state.current_phase = Some(name.clone());
939 state
940 .calls
941 .push_back(WorkflowRuntimeCall::Phase { name, options });
942 Ok::<(), rquickjs::Error>(())
943 },
944 )))
945 .enumerable()
946 .configurable(),
947 )
948 .context("failed to install workflow phase global")?;
949
950 let agent_state = Rc::clone(&state);
951 globals
952 .prop(
953 "agent",
954 Property::from(Func::from(MutFn::from(
955 move |ctx: rquickjs::Ctx<'js>, prompt: String, options: Opt<Value<'js>>| {
956 let options = match options.0 {
957 Some(value) => Some(
958 rquickjs_serde::from_value::<serde_json::Value>(value).map_err(
959 |error| rquickjs::Error::FromJs {
960 from: "value",
961 to: "serde_json::Value",
962 message: Some(error.to_string()),
963 },
964 )?,
965 ),
966 None => None,
967 };
968 create_pending_request(&ctx, &agent_state, |id, state| {
969 let mut options = options.unwrap_or_else(|| serde_json::json!({}));
970 if let Some(current_phase) = state.current_phase.clone() {
971 if options.get("phase").is_none() {
972 options["phase"] = serde_json::Value::String(current_phase);
973 }
974 }
975 let options = if options.as_object().is_some_and(|object| object.is_empty())
976 {
977 None
978 } else {
979 Some(options)
980 };
981 WorkflowRuntimeRequest::Agent {
982 id,
983 prompt,
984 options,
985 }
986 })
987 },
988 )))
989 .enumerable()
990 .configurable(),
991 )
992 .context("failed to install workflow agent global")?;
993
994 let workflow_state = Rc::clone(&state);
995 globals
996 .prop(
997 "workflow",
998 Property::from(Func::from(MutFn::from(
999 move |ctx: rquickjs::Ctx<'js>, workflow_ref: Value<'js>, args: Opt<Value<'js>>| {
1000 let workflow_ref = rquickjs_serde::from_value::<super::WorkflowRef>(
1001 workflow_ref,
1002 )
1003 .map_err(|error| rquickjs::Error::FromJs {
1004 from: "value",
1005 to: "WorkflowRef",
1006 message: Some(error.to_string()),
1007 })?;
1008 let args = match args.0 {
1009 Some(value) => Some(
1010 rquickjs_serde::from_value::<serde_json::Value>(value).map_err(
1011 |error| rquickjs::Error::FromJs {
1012 from: "value",
1013 to: "serde_json::Value",
1014 message: Some(error.to_string()),
1015 },
1016 )?,
1017 ),
1018 None => None,
1019 };
1020 create_pending_request(&ctx, &workflow_state, |id, _state| {
1021 WorkflowRuntimeRequest::Workflow {
1022 id,
1023 workflow_ref,
1024 args,
1025 }
1026 })
1027 },
1028 )))
1029 .enumerable()
1030 .configurable(),
1031 )
1032 .context("failed to install workflow child workflow global")?;
1033
1034 let sw = create_sw_object(globals.ctx(), state)?;
1035 globals
1036 .prop("SW", Property::from(sw).enumerable().configurable())
1037 .context("failed to install workflow SW global")?;
1038
1039 Ok(())
1040}
1041
1042fn create_sw_object<'js>(
1043 ctx: &rquickjs::Ctx<'js>,
1044 state: Rc<RefCell<RuntimeState>>,
1045) -> anyhow::Result<Object<'js>> {
1046 let sw = Object::new(ctx.clone()).context("failed to create workflow SW object")?;
1047 let extra = create_extra_object(ctx, Rc::clone(&state))?;
1048 sw.prop("extra", Property::from(extra).enumerable())
1049 .context("failed to install workflow SW.extra object")?;
1050 let sandbox = create_sandbox_object(ctx, state)?;
1051 sw.prop("sandbox", Property::from(sandbox).enumerable())
1052 .context("failed to install workflow SW.sandbox object")?;
1053 Ok(sw)
1054}
1055
1056fn create_extra_object<'js>(
1057 ctx: &rquickjs::Ctx<'js>,
1058 state: Rc<RefCell<RuntimeState>>,
1059) -> anyhow::Result<Object<'js>> {
1060 let extra = Object::new(ctx.clone()).context("failed to create workflow extra object")?;
1061 let sleep_state = Rc::clone(&state);
1062 extra
1063 .prop(
1064 "sleep",
1065 Property::from(Func::from(MutFn::from(
1066 move |ctx: rquickjs::Ctx<'js>, duration: Value<'js>| {
1067 let max_sleep_ms = sleep_state.borrow().max_sleep_ms;
1068 let duration_ms = validate_sleep_duration(&ctx, &duration, max_sleep_ms)?;
1069 create_pending_request(&ctx, &sleep_state, |id, _state| {
1070 WorkflowRuntimeRequest::Sleep { id, duration_ms }
1071 })
1072 },
1073 )))
1074 .enumerable(),
1075 )
1076 .context("failed to install workflow extra sleep function")?;
1077 Ok(extra)
1078}
1079
1080fn create_sandbox_object<'js>(
1081 ctx: &rquickjs::Ctx<'js>,
1082 state: Rc<RefCell<RuntimeState>>,
1083) -> anyhow::Result<Object<'js>> {
1084 let sandbox = Object::new(ctx.clone()).context("failed to create workflow sandbox object")?;
1085 let exec_state = Rc::clone(&state);
1086 sandbox
1087 .prop(
1088 "exec",
1089 Property::from(Func::from(MutFn::from(
1090 move |ctx: rquickjs::Ctx<'js>, profile: String, request: Value<'js>| {
1091 let request =
1092 rquickjs_serde::from_value::<super::WorkflowSandboxExecRequest>(request)
1093 .map_err(|error| rquickjs::Error::FromJs {
1094 from: "value",
1095 to: "WorkflowSandboxExecRequest",
1096 message: Some(error.to_string()),
1097 })?;
1098 create_pending_request(&ctx, &exec_state, |id, _state| {
1099 WorkflowRuntimeRequest::SandboxExec {
1100 id,
1101 profile,
1102 request,
1103 }
1104 })
1105 },
1106 )))
1107 .enumerable(),
1108 )
1109 .context("failed to install workflow sandbox exec function")?;
1110 Ok(sandbox)
1111}
1112
1113fn validate_sleep_duration<'js>(
1114 ctx: &rquickjs::Ctx<'js>,
1115 value: &Value<'js>,
1116 max_sleep_ms: u64,
1117) -> rquickjs::Result<u64> {
1118 let Some(number) = value.as_number() else {
1119 return Err(Exception::throw_message(
1120 ctx,
1121 "sleep(ms) requires a finite non-negative number",
1122 ));
1123 };
1124 if !number.is_finite() || number < 0.0 {
1125 return Err(Exception::throw_message(
1126 ctx,
1127 "sleep(ms) requires a finite non-negative number",
1128 ));
1129 }
1130 let duration_ms = number.ceil();
1131 if duration_ms > max_sleep_ms as f64 {
1132 return Err(Exception::throw_message(
1133 ctx,
1134 "sleep(ms) duration exceeds the maximum allowed delay",
1135 ));
1136 }
1137 Ok(duration_ms as u64)
1138}
1139
1140fn create_pending_request<'js>(
1141 ctx: &rquickjs::Ctx<'js>,
1142 state: &Rc<RefCell<RuntimeState>>,
1143 make_request: impl FnOnce(String, &mut RuntimeState) -> WorkflowRuntimeRequest,
1144) -> rquickjs::Result<Promise<'js>> {
1145 let (promise, resolve, reject) = ctx.promise()?;
1146 let mut state = state.borrow_mut();
1147 state.next_request_id += 1;
1148 let id = state.next_request_id.to_string();
1149 let request = make_request(id.clone(), &mut state);
1150 log::debug!("quickjs queued request id={} kind={}", id, request.kind());
1151 state.pending_requests.insert(
1152 id,
1153 PendingRequest {
1154 resolve: Persistent::save(ctx, resolve),
1155 reject: Persistent::save(ctx, reject),
1156 },
1157 );
1158 state.requests.push_back(request);
1159 Ok(promise)
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164 use super::*;
1165 use crate::js_runtime::{WorkflowModuleInput, WorkflowRuntimePoll};
1166 use serde_json::json;
1167
1168 #[test]
1169 fn executes_default_export_object() {
1170 let mut execution = RQuickJSWorkflowRuntime::new()
1171 .start_module(WorkflowModuleInput::new(
1172 r#"
1173export const meta = { name: "inline", description: "inline" };
1174export default { ok: true, args };
1175"#,
1176 "inline.workflow.js",
1177 json!({ "value": 1 }),
1178 ))
1179 .expect("workflow should start");
1180
1181 let output = loop {
1182 match execution.poll().expect("workflow should poll") {
1183 WorkflowRuntimePoll::Complete(output) => break output,
1184 WorkflowRuntimePoll::Pending => continue,
1185 other => panic!("expected completion, got {other:?}"),
1186 }
1187 };
1188
1189 assert_eq!(output.result, json!({ "ok": true, "args": { "value": 1 } }));
1190 }
1191}