1use super::{
56 ImportPolicy, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
57 WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll, WorkflowRuntimeRequest,
58 WorkflowRuntimeRequestResolution,
59};
60use anyhow::{anyhow, bail, Context as AnyhowContext};
61use rquickjs::{
62 context::intrinsic,
63 loader::{Loader, Resolver},
64 module::Declared,
65 object::{Accessor, Property},
66 prelude::{Func, MutFn, Opt, Rest},
67 promise::PromiseState,
68 CatchResultExt, CaughtError, Context, Error as RQuickJSError, Exception, Function, Module,
69 Object, Persistent, Promise, Runtime, Undefined, Value,
70};
71use std::{
72 cell::RefCell,
73 collections::{HashMap, VecDeque},
74 rc::Rc,
75 sync::{Arc, Mutex},
76 time::{Duration, Instant},
77};
78
79type WorkflowIntrinsics = (
80 intrinsic::Eval,
81 intrinsic::Json,
82 intrinsic::Promise,
83 intrinsic::Proxy,
84 intrinsic::MapSet,
85 intrinsic::RegExp,
86);
87
88const WORKFLOW_EXTRA_MODULE: &str = "workflow:extra";
89const DEFAULT_MAX_SLEEP_MS: u64 = 365 * 24 * 60 * 60 * 1000;
93
94const BLOCKED_GLOBALS: &[&str] = &[
95 "eval",
96 "Function",
97 "AsyncFunction",
98 "Date",
99 "fetch",
100 "XMLHttpRequest",
101 "WebSocket",
102 "EventSource",
103 "navigator",
104 "location",
105 "Deno",
106 "Bun",
107 "process",
108 "require",
109 "Buffer",
110 "__dirname",
111 "__filename",
112];
113
114const INTERNAL_GLOBALS: &[&str] = &["__readonly"];
115
116#[derive(Debug)]
117struct WorkflowExtraResolver;
118
119impl Resolver for WorkflowExtraResolver {
120 fn resolve<'js>(
121 &mut self,
122 _ctx: &rquickjs::Ctx<'js>,
123 base: &str,
124 name: &str,
125 ) -> rquickjs::Result<String> {
126 if name == WORKFLOW_EXTRA_MODULE {
127 Ok(WORKFLOW_EXTRA_MODULE.to_string())
128 } else {
129 Err(RQuickJSError::new_resolving_message(
130 base,
131 name,
132 "workflow imports are restricted; only workflow:extra is available",
133 ))
134 }
135 }
136}
137
138#[derive(Debug)]
139struct WorkflowExtraLoader;
140
141impl Loader for WorkflowExtraLoader {
142 fn load<'js>(
143 &mut self,
144 ctx: &rquickjs::Ctx<'js>,
145 name: &str,
146 ) -> rquickjs::Result<Module<'js, Declared>> {
147 if name != WORKFLOW_EXTRA_MODULE {
148 return Err(RQuickJSError::new_loading_message(
149 name,
150 "workflow imports are restricted; only workflow:extra is available",
151 ));
152 }
153 Module::declare(
154 ctx.clone(),
155 WORKFLOW_EXTRA_MODULE,
156 include_str!("rquickjs_js/workflow_extra.js"),
157 )
158 }
159}
160
161#[derive(Debug, Clone, Copy)]
163pub struct RQuickJSWorkflowRuntime {
164 max_sleep_ms: u64,
165}
166
167impl Default for RQuickJSWorkflowRuntime {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173impl RQuickJSWorkflowRuntime {
174 pub fn new() -> Self {
175 Self {
176 max_sleep_ms: DEFAULT_MAX_SLEEP_MS,
177 }
178 }
179
180 pub fn with_max_sleep_ms(mut self, max_sleep_ms: u64) -> Self {
181 self.max_sleep_ms = max_sleep_ms;
182 self
183 }
184}
185
186impl WorkflowJSRuntime for RQuickJSWorkflowRuntime {
187 fn start_module(
188 &self,
189 input: WorkflowModuleInput,
190 ) -> anyhow::Result<Box<dyn WorkflowRuntimeExecution>> {
191 log::debug!(
192 "quickjs start_module source={} args_type={} budget_total={:?} budget_spent={}",
193 input.source_name,
194 json_value_type(&input.args),
195 input.budget.total,
196 input.budget.spent
197 );
198 if input.sandbox.import_policy != ImportPolicy::DenyAll {
199 bail!("unsupported workflow import policy");
200 }
201
202 let runtime = Runtime::new().context("failed to create QuickJS runtime")?;
203 runtime.set_memory_limit(input.sandbox.memory_limit_bytes);
204 runtime.set_max_stack_size(input.sandbox.max_stack_size_bytes);
205 runtime.set_loader(WorkflowExtraResolver, WorkflowExtraLoader);
206
207 let timeout = input.sandbox.timeout;
208 let deadline = Arc::new(Mutex::new(Instant::now() + timeout));
209 let interrupt_deadline = Arc::clone(&deadline);
210 runtime.set_interrupt_handler(Some(Box::new(move || match interrupt_deadline.lock() {
211 Ok(deadline) => Instant::now() >= *deadline,
212 Err(_) => true,
213 })));
214
215 let context = Context::custom::<WorkflowIntrinsics>(&runtime)
216 .context("failed to create restricted QuickJS context")?;
217
218 let mut execution = RQuickJSWorkflowExecution {
219 state: Rc::new(RefCell::new(RuntimeState {
220 max_sleep_ms: self.max_sleep_ms,
221 ..RuntimeState::default()
222 })),
223 module_namespace: None,
224 module_eval_promise: None,
225 workflow_promise: None,
226 readonly: None,
227 context,
228 runtime,
229 deadline,
230 timeout,
231 };
232 execution.start(input)?;
233 Ok(Box::new(execution))
234 }
235}
236
237struct RQuickJSWorkflowExecution {
238 state: Rc<RefCell<RuntimeState>>,
240 module_namespace: Option<Persistent<Object<'static>>>,
241 module_eval_promise: Option<Persistent<Promise<'static>>>,
242 workflow_promise: Option<Persistent<Promise<'static>>>,
243 readonly: Option<Persistent<Function<'static>>>,
244 context: Context,
245 #[allow(dead_code)]
246 runtime: Runtime,
247 deadline: Arc<Mutex<Instant>>,
248 timeout: Duration,
249}
250
251#[derive(Default)]
252struct RuntimeState {
253 calls: VecDeque<WorkflowRuntimeCall>,
254 requests: VecDeque<WorkflowRuntimeRequest>,
255 pending_requests: HashMap<String, PendingRequest>,
256 next_request_id: u64,
257 current_phase: Option<String>,
258 budget: super::WorkflowBudgetSnapshot,
259 max_sleep_ms: u64,
260}
261
262#[derive(Clone)]
263struct PendingRequest {
264 resolve: Persistent<Function<'static>>,
265 reject: Persistent<Function<'static>>,
266}
267
268impl RQuickJSWorkflowExecution {
269 fn start(&mut self, input: WorkflowModuleInput) -> anyhow::Result<()> {
270 let context = self.context.clone();
271 context.with(|ctx| -> anyhow::Result<()> {
272 evaluate_sandbox_prelude(&ctx)?;
273
274 let RuntimeGlobals {
275 source_name,
276 source,
277 readonly,
278 } = install_runtime_globals(&ctx, input, Rc::clone(&self.state))?;
279 self.readonly = Some(readonly);
280 self.evaluate_module(&ctx, source_name, source)?;
281 Ok(())
282 })
283 }
284
285 fn evaluate_module(
286 &mut self,
287 ctx: &rquickjs::Ctx<'_>,
288 source_name: String,
289 source: String,
290 ) -> anyhow::Result<()> {
291 log::debug!("quickjs evaluate_module source={source_name}");
292 let module = Module::declare(ctx.clone(), source_name, source)
293 .catch(ctx)
294 .map_err(|error| anyhow!("failed to declare workflow module: {error:?}"))?;
295 let (module, promise) = module
296 .eval()
297 .catch(ctx)
298 .map_err(|error| anyhow!("failed to evaluate workflow module: {error:?}"))?;
299 let namespace = module
300 .namespace()
301 .context("failed to get workflow module namespace")?;
302
303 self.module_namespace = Some(Persistent::save(ctx, namespace));
304 self.module_eval_promise = Some(Persistent::save(ctx, promise));
305 Ok(())
306 }
307
308 fn refresh_deadline(&self) -> anyhow::Result<()> {
309 let mut deadline = self
310 .deadline
311 .lock()
312 .map_err(|_| anyhow!("QuickJS interrupt deadline lock was poisoned"))?;
313 *deadline = Instant::now() + self.timeout;
314 Ok(())
315 }
316
317 fn drain_jobs(&self) -> anyhow::Result<()> {
318 self.refresh_deadline()?;
319 self.context.with(|ctx| while ctx.execute_pending_job() {});
320 Ok(())
321 }
322}
323
324impl WorkflowRuntimeExecution for RQuickJSWorkflowExecution {
325 fn poll(&mut self) -> anyhow::Result<WorkflowRuntimePoll> {
326 self.drain_jobs()?;
327
328 let context = self.context.clone();
329 context.with(|ctx| -> anyhow::Result<WorkflowRuntimePoll> {
330 if let Some(call) = self.state.borrow_mut().calls.pop_front() {
331 return Ok(WorkflowRuntimePoll::Call(call));
332 }
333
334 if let Some(request) = self.state.borrow().requests.front().cloned() {
335 return Ok(WorkflowRuntimePoll::Request(request));
336 }
337
338 if self.workflow_promise.is_none() {
339 match self.module_eval_state(&ctx)? {
340 PromiseState::Pending => return Ok(WorkflowRuntimePoll::Pending),
341 PromiseState::Rejected => {
342 bail!(
343 "workflow module evaluation rejected: {}",
344 self.module_eval_rejection_message(&ctx)
345 )
346 }
347 PromiseState::Resolved => self.start_default_export(&ctx)?,
348 }
349 }
350
351 self.poll_workflow_promise(&ctx)
352 })
353 }
354
355 fn take_pending_requests(&mut self) -> anyhow::Result<Vec<WorkflowRuntimeRequest>> {
356 self.drain_jobs()?;
357 Ok(self.state.borrow_mut().requests.drain(..).collect())
358 }
359
360 fn resolve_request(
361 &mut self,
362 id: &str,
363 resolution: WorkflowRuntimeRequestResolution,
364 ) -> anyhow::Result<()> {
365 let resolution_json = match resolution {
366 WorkflowRuntimeRequestResolution::Ok(value) => serde_json::json!({
367 "ok": true,
368 "value": value,
369 }),
370 WorkflowRuntimeRequestResolution::OkUndefined => serde_json::json!({
371 "ok": true,
372 "undefined": true,
373 }),
374 WorkflowRuntimeRequestResolution::OkWithBudget { value, budget } => {
375 self.state.borrow_mut().budget = budget;
376 serde_json::json!({
377 "ok": true,
378 "value": value,
379 })
380 }
381 WorkflowRuntimeRequestResolution::Err { message } => serde_json::json!({
382 "ok": false,
383 "message": message,
384 }),
385 };
386
387 self.refresh_deadline()?;
388 self.context.with(|ctx| -> anyhow::Result<()> {
389 let pending = self
390 .state
391 .borrow()
392 .pending_requests
393 .get(id)
394 .cloned()
395 .ok_or_else(|| anyhow!("unknown workflow request id: {id}"))?;
396 let resolution = rquickjs_serde::to_value(ctx.clone(), &resolution_json)
397 .context("failed to convert workflow request resolution to QuickJS value")?;
398 let resolution_object: Object<'_> = resolution
399 .as_object()
400 .cloned()
401 .ok_or_else(|| anyhow!("request resolution was not an object"))?;
402 let ok = resolution_object
403 .get::<_, bool>("ok")
404 .context("failed to read request resolution status")?;
405
406 let resolved = if ok {
407 let value: Value<'_> = if resolution_object
408 .get::<_, bool>("undefined")
409 .unwrap_or(false)
410 {
411 Undefined.into_value(ctx.clone())
412 } else {
413 resolution_object
414 .get("value")
415 .context("failed to read request resolution value")?
416 };
417 let resolve = pending
418 .resolve
419 .restore(&ctx)
420 .context("failed to restore request resolver")?;
421 resolve
422 .call::<_, ()>((value,))
423 .catch(&ctx)
424 .map_err(|error| anyhow!("failed to resolve workflow request: {error:?}"))
425 } else {
426 let message = resolution_object
427 .get::<_, String>("message")
428 .unwrap_or_else(|_| "workflow request rejected".to_string());
429 let error_constructor: Function = ctx
430 .globals()
431 .get("Error")
432 .context("failed to get Error constructor")?;
433 let error_value: Value<'_> = error_constructor
434 .call((message,))
435 .catch(&ctx)
436 .map_err(|error| {
437 anyhow!("failed to construct request rejection error: {error:?}")
438 })?;
439 let reject = pending
440 .reject
441 .restore(&ctx)
442 .context("failed to restore request rejecter")?;
443 reject
444 .call::<_, ()>((error_value,))
445 .catch(&ctx)
446 .map_err(|error| anyhow!("failed to reject workflow request: {error:?}"))
447 };
448
449 if resolved.is_ok() {
450 let mut state = self.state.borrow_mut();
451 state.pending_requests.remove(id);
452 state.requests.retain(|request| request.id() != id);
453 }
454
455 resolved
456 })
457 }
458}
459
460impl RQuickJSWorkflowExecution {
461 fn module_eval_state(&self, ctx: &rquickjs::Ctx<'_>) -> anyhow::Result<PromiseState> {
462 let promise = self
463 .module_eval_promise
464 .clone()
465 .ok_or_else(|| anyhow!("workflow module evaluation was not started"))?
466 .restore(ctx)
467 .context("failed to restore workflow module evaluation promise")?;
468 Ok(promise.state())
469 }
470
471 fn module_eval_rejection_message(&self, ctx: &rquickjs::Ctx<'_>) -> String {
472 if let Some(promise) = self
473 .module_eval_promise
474 .clone()
475 .and_then(|promise| promise.restore(ctx).ok())
476 {
477 let _ = promise.result::<Value<'_>>();
478 }
479 js_exception_message(ctx)
480 }
481
482 fn start_default_export(&mut self, ctx: &rquickjs::Ctx<'_>) -> anyhow::Result<()> {
483 let namespace = self
484 .module_namespace
485 .clone()
486 .ok_or_else(|| anyhow!("workflow module namespace is missing"))?
487 .restore(ctx)
488 .context("failed to restore workflow module namespace")?;
489 if !namespace
490 .contains_key("default")
491 .context("failed to inspect workflow module default export")?
492 {
493 bail!("workflow module must default export a workflow result or function");
494 }
495 let default_export: Value<'_> = namespace
496 .get("default")
497 .context("workflow module must default export a workflow result or function")?;
498 let promise = start_default_export(ctx, default_export)
499 .context("failed to start workflow default export")?;
500 self.workflow_promise = Some(Persistent::save(ctx, promise));
501 Ok(())
502 }
503
504 fn poll_workflow_promise(
505 &self,
506 ctx: &rquickjs::Ctx<'_>,
507 ) -> anyhow::Result<WorkflowRuntimePoll> {
508 let promise = self
509 .workflow_promise
510 .clone()
511 .ok_or_else(|| anyhow!("workflow default execution was not started"))?
512 .restore(ctx)
513 .context("failed to restore workflow promise")?;
514
515 match promise.state() {
516 PromiseState::Pending => Ok(WorkflowRuntimePoll::Pending),
517 PromiseState::Rejected => {
518 let _ = promise.result::<Value<'_>>();
519 bail!("workflow module rejected: {}", js_exception_message(ctx))
520 }
521 PromiseState::Resolved => {
522 let result = promise
523 .result::<Value<'_>>()
524 .ok_or_else(|| anyhow!("workflow promise resolved without a result"))?
525 .catch(ctx)
526 .map_err(|error| anyhow!("failed to read workflow result: {error:?}"))?;
527 let result = rquickjs_serde::from_value::<serde_json::Value>(result)
528 .context("failed to convert workflow result from QuickJS value")?;
529 Ok(WorkflowRuntimePoll::Complete(WorkflowModuleOutput {
530 result,
531 }))
532 }
533 }
534 }
535}
536
537fn json_value_type(value: &serde_json::Value) -> &'static str {
538 match value {
539 serde_json::Value::Null => "null",
540 serde_json::Value::Bool(_) => "boolean",
541 serde_json::Value::Number(_) => "number",
542 serde_json::Value::String(_) => "string",
543 serde_json::Value::Array(_) => "array",
544 serde_json::Value::Object(_) => "object",
545 }
546}
547
548fn evaluate_sandbox_prelude(ctx: &rquickjs::Ctx<'_>) -> anyhow::Result<()> {
549 let module = Module::declare(
550 ctx.clone(),
551 "smol:workflow-sandbox-prelude".to_string(),
552 include_str!("rquickjs_js/sandbox_prelude.js").to_string(),
553 )
554 .catch(ctx)
555 .map_err(|error| anyhow!("failed to declare sandbox prelude: {error:?}"))?;
556 let (_module, promise) = module
557 .eval()
558 .catch(ctx)
559 .map_err(|error| anyhow!("failed to evaluate sandbox prelude: {error:?}"))?;
560
561 while promise.state() == PromiseState::Pending {
562 if !ctx.execute_pending_job() {
563 bail!("sandbox prelude did not complete");
564 }
565 }
566
567 if promise.state() == PromiseState::Rejected {
568 let _ = promise.result::<Value<'_>>();
569 bail!("sandbox prelude rejected: {}", js_exception_message(ctx));
570 }
571
572 Ok(())
573}
574
575fn js_exception_message(ctx: &rquickjs::Ctx<'_>) -> String {
576 let error = ctx.catch();
577 if let Some(object) = error.as_object() {
578 let message = object
579 .get::<_, String>("message")
580 .ok()
581 .filter(|message| !message.is_empty());
582 let stack = object
583 .get::<_, String>("stack")
584 .ok()
585 .filter(|stack| !stack.is_empty());
586
587 match (message, stack) {
588 (Some(message), Some(stack)) if stack.contains(&message) => return stack,
589 (Some(message), Some(stack)) => return format!("{message}\n{stack}"),
590 (Some(message), None) => return message,
591 (None, Some(stack)) => return stack,
592 (None, None) => {}
593 }
594 }
595
596 if let Ok(value) = rquickjs_serde::from_value::<serde_json::Value>(error.clone()) {
597 return match value {
598 serde_json::Value::String(message) if !message.is_empty() => message,
599 other => other.to_string(),
600 };
601 }
602
603 format!("{error:?}")
604}
605
606fn install_runtime_globals<'js>(
607 ctx: &rquickjs::Ctx<'js>,
608 input: WorkflowModuleInput,
609 state: Rc<RefCell<RuntimeState>>,
610) -> anyhow::Result<RuntimeGlobals> {
611 let globals = ctx.globals();
612
613 let WorkflowModuleInput {
614 source,
615 source_name,
616 args,
617 budget,
618 sandbox: _,
619 } = input;
620
621 state.borrow_mut().budget = budget;
622
623 let args = rquickjs_serde::to_value(ctx.clone(), &args)
624 .context("failed to convert workflow args to QuickJS value")?;
625 let readonly: Function = globals
626 .get("__readonly")
627 .context("failed to get readonly helper")?;
628 let readonly = Persistent::save(ctx, readonly);
629 let readonly_args =
630 readonly_proxy(ctx, &readonly, args).context("failed to wrap workflow args as readonly")?;
631 define_readonly_data_property(ctx, &globals, "args", readonly_args, true)
632 .context("failed to install readonly workflow args global")?;
633
634 let budget = create_budget_object(ctx, Rc::clone(&state))?;
635 let budget = readonly_proxy(ctx, &readonly, budget.into())
636 .context("failed to wrap workflow budget as readonly")?;
637 define_readonly_data_property(ctx, &globals, "budget", budget, true)
638 .context("failed to install workflow budget global")?;
639
640 install_native_workflow_functions(&globals, state)?;
641 harden_public_workflow_globals(ctx, &globals, &readonly)?;
642
643 harden_workflow_sandbox(ctx, &globals)?;
644 hide_internal_globals(&globals);
645
646 Ok(RuntimeGlobals {
647 source_name,
648 source,
649 readonly,
650 })
651}
652
653struct RuntimeGlobals {
654 source_name: String,
655 source: String,
656 readonly: Persistent<Function<'static>>,
657}
658
659fn start_default_export<'js>(
660 ctx: &rquickjs::Ctx<'js>,
661 default_export: Value<'js>,
662) -> anyhow::Result<Promise<'js>> {
663 let globals = ctx.globals();
664 let result = if let Some(default_function) = default_export.as_function().cloned() {
665 let args: Value<'js> = globals
666 .get("args")
667 .context("failed to get workflow args global")?;
668 let workflow_context = create_workflow_context_object(ctx, &globals)?;
669 default_function
670 .call::<_, Value<'js>>((args, workflow_context))
671 .catch(ctx)
672 } else {
673 Ok(default_export)
674 };
675
676 let (promise, resolve, reject) =
677 Promise::new(ctx).context("failed to create workflow promise")?;
678 match result {
679 Ok(value) => resolve
680 .call::<_, ()>((value,))
681 .catch(ctx)
682 .map_err(|error| anyhow!("failed to resolve workflow promise: {error:?}"))?,
683 Err(CaughtError::Exception(error)) => reject
684 .call::<_, ()>((error.into_value(),))
685 .catch(ctx)
686 .map_err(|error| anyhow!("failed to reject workflow promise: {error:?}"))?,
687 Err(CaughtError::Value(error)) => reject
688 .call::<_, ()>((error,))
689 .catch(ctx)
690 .map_err(|error| anyhow!("failed to reject workflow promise: {error:?}"))?,
691 Err(CaughtError::Error(error)) => {
692 return Err(anyhow!("failed to call workflow default export: {error:?}"));
693 }
694 }
695 Ok(promise)
696}
697
698fn create_workflow_context_object<'js>(
699 ctx: &rquickjs::Ctx<'js>,
700 globals: &Object<'js>,
701) -> anyhow::Result<Object<'js>> {
702 let workflow_context = Object::new(ctx.clone()).context("failed to create workflow context")?;
703 for name in [
704 "args", "agent", "parallel", "pipeline", "workflow", "budget", "log", "phase",
705 ] {
706 let value: Value<'js> = globals
707 .get(name)
708 .with_context(|| format!("failed to get workflow context value {name}"))?;
709 workflow_context
710 .prop(name, Property::from(value).enumerable())
711 .with_context(|| format!("failed to install workflow context value {name}"))?;
712 }
713
714 let sw: Object<'js> = globals
715 .get("SW")
716 .context("failed to get workflow context SW namespace")?;
717 let extra: Value<'js> = sw
718 .get("extra")
719 .context("failed to get workflow context extra namespace")?;
720 workflow_context
721 .prop("extra", Property::from(extra).enumerable())
722 .context("failed to install workflow context extra namespace")?;
723
724 Ok(workflow_context)
725}
726
727fn readonly_proxy<'js>(
728 ctx: &rquickjs::Ctx<'js>,
729 readonly: &Persistent<Function<'static>>,
730 value: Value<'js>,
731) -> anyhow::Result<Value<'js>> {
732 let readonly = readonly
733 .clone()
734 .restore(ctx)
735 .context("failed to restore readonly proxy helper")?;
736 readonly
737 .call((value,))
738 .catch(ctx)
739 .map_err(|error| anyhow!("failed to create readonly proxy: {error:?}"))
740}
741
742fn harden_public_workflow_globals<'js>(
743 ctx: &rquickjs::Ctx<'js>,
744 globals: &Object<'js>,
745 readonly: &Persistent<Function<'static>>,
746) -> anyhow::Result<()> {
747 for name in [
748 "agent", "workflow", "log", "phase", "parallel", "pipeline", "SW",
749 ] {
750 let value: Value<'js> = globals
751 .get(name)
752 .with_context(|| format!("failed to get workflow global {name}"))?;
753 let value = readonly_proxy(ctx, readonly, value)
754 .with_context(|| format!("failed to wrap workflow global {name} as readonly"))?;
755 define_readonly_data_property(ctx, globals, name, value, true)
756 .with_context(|| format!("failed to harden workflow global {name}"))?;
757 }
758 Ok(())
759}
760
761fn harden_workflow_sandbox<'js>(
762 ctx: &rquickjs::Ctx<'js>,
763 globals: &Object<'js>,
764) -> anyhow::Result<()> {
765 let math: Object<'_> = globals.get("Math").context("failed to get Math global")?;
766 let random = Function::new(
767 ctx.clone(),
768 |ctx: rquickjs::Ctx<'_>| -> rquickjs::Result<()> {
769 Err(Exception::throw_message(
770 &ctx,
771 "Math.random is disabled in smol workflow sandbox",
772 ))
773 },
774 )
775 .context("failed to create disabled Math.random function")?;
776 define_readonly_data_property(ctx, &math, "random", random.into_value(), false)
777 .context("failed to replace Math.random")?;
778
779 for name in BLOCKED_GLOBALS {
780 define_readonly_data_property(ctx, globals, name, Undefined.into_value(ctx.clone()), false)
781 .with_context(|| format!("failed to block workflow global {name}"))?;
782 }
783
784 Ok(())
785}
786
787fn hide_internal_globals<'js>(globals: &Object<'js>) {
788 for name in INTERNAL_GLOBALS {
789 let _ = define_readonly_data_property(
790 globals.ctx(),
791 globals,
792 name,
793 Undefined.into_value(globals.ctx().clone()),
794 false,
795 );
796 }
797}
798
799fn define_readonly_data_property<'js>(
800 ctx: &rquickjs::Ctx<'js>,
801 target: &Object<'js>,
802 name: &str,
803 value: Value<'js>,
804 enumerable: bool,
805) -> anyhow::Result<()> {
806 let descriptor = Object::new(ctx.clone()).context("failed to create property descriptor")?;
807 descriptor
808 .set("value", value)
809 .context("failed to set property descriptor value")?;
810 descriptor
811 .set("writable", false)
812 .context("failed to set property descriptor writable flag")?;
813 descriptor
814 .set("configurable", false)
815 .context("failed to set property descriptor configurable flag")?;
816 descriptor
817 .set("enumerable", enumerable)
818 .context("failed to set property descriptor enumerable flag")?;
819
820 let object: Object<'js> = ctx
821 .globals()
822 .get("Object")
823 .context("failed to get Object")?;
824 let define_property: Function<'js> = object
825 .get("defineProperty")
826 .context("failed to get Object.defineProperty")?;
827 define_property
828 .call::<_, ()>((target.clone(), name, descriptor))
829 .catch(ctx)
830 .map_err(|error| anyhow!("Object.defineProperty failed for {name}: {error:?}"))
831}
832
833fn create_budget_object<'js>(
834 ctx: &rquickjs::Ctx<'js>,
835 state: Rc<RefCell<RuntimeState>>,
836) -> anyhow::Result<Object<'js>> {
837 let object = Object::new(ctx.clone()).context("failed to create workflow budget object")?;
838
839 let total_state = Rc::clone(&state);
840 object
841 .prop(
842 "total",
843 Accessor::from(
844 move |ctx: rquickjs::Ctx<'js>| -> rquickjs::Result<Value<'js>> {
845 rquickjs_serde::to_value(ctx, total_state.borrow().budget.total).map_err(
846 |error| rquickjs::Error::IntoJs {
847 from: "WorkflowBudgetSnapshot.total",
848 to: "value",
849 message: Some(error.to_string()),
850 },
851 )
852 },
853 )
854 .enumerable(),
855 )
856 .context("failed to install workflow budget total")?;
857
858 let spent_state = Rc::clone(&state);
859 object
860 .prop(
861 "spent",
862 Property::from(Func::from(move || spent_state.borrow().budget.spent)).enumerable(),
863 )
864 .context("failed to install workflow budget spent function")?;
865
866 object
867 .prop(
868 "remaining",
869 Property::from(Func::from(move || {
870 let budget = &state.borrow().budget;
871 match budget.total {
872 Some(total) => total.saturating_sub(budget.spent) as f64,
873 None => f64::INFINITY,
874 }
875 }))
876 .enumerable(),
877 )
878 .context("failed to install workflow budget remaining function")?;
879
880 Ok(object)
881}
882
883fn install_native_workflow_functions<'js>(
884 globals: &Object<'js>,
885 state: Rc<RefCell<RuntimeState>>,
886) -> anyhow::Result<()> {
887 let log_state = Rc::clone(&state);
888 globals
889 .prop(
890 "log",
891 Property::from(Func::from(MutFn::from(move |values: Rest<Value<'js>>| {
892 let values = values
893 .0
894 .into_iter()
895 .map(rquickjs_serde::from_value::<serde_json::Value>)
896 .collect::<Result<Vec<_>, _>>()
897 .map_err(|error| rquickjs::Error::FromJs {
898 from: "value",
899 to: "serde_json::Value",
900 message: Some(error.to_string()),
901 })?;
902 log_state
903 .borrow_mut()
904 .calls
905 .push_back(WorkflowRuntimeCall::Log { values });
906 Ok::<(), rquickjs::Error>(())
907 })))
908 .enumerable()
909 .configurable(),
910 )
911 .context("failed to install workflow log global")?;
912
913 let phase_state = Rc::clone(&state);
914 globals
915 .prop(
916 "phase",
917 Property::from(Func::from(MutFn::from(
918 move |name: String, options: Opt<Value<'js>>| {
919 let options = match options.0 {
920 Some(value) => Some(
921 rquickjs_serde::from_value::<serde_json::Value>(value).map_err(
922 |error| rquickjs::Error::FromJs {
923 from: "value",
924 to: "serde_json::Value",
925 message: Some(error.to_string()),
926 },
927 )?,
928 ),
929 None => None,
930 };
931 let mut state = phase_state.borrow_mut();
932 state.current_phase = Some(name.clone());
933 state
934 .calls
935 .push_back(WorkflowRuntimeCall::Phase { name, options });
936 Ok::<(), rquickjs::Error>(())
937 },
938 )))
939 .enumerable()
940 .configurable(),
941 )
942 .context("failed to install workflow phase global")?;
943
944 let agent_state = Rc::clone(&state);
945 globals
946 .prop(
947 "agent",
948 Property::from(Func::from(MutFn::from(
949 move |ctx: rquickjs::Ctx<'js>, prompt: String, options: Opt<Value<'js>>| {
950 let options = match options.0 {
951 Some(value) => Some(
952 rquickjs_serde::from_value::<serde_json::Value>(value).map_err(
953 |error| rquickjs::Error::FromJs {
954 from: "value",
955 to: "serde_json::Value",
956 message: Some(error.to_string()),
957 },
958 )?,
959 ),
960 None => None,
961 };
962 create_pending_request(&ctx, &agent_state, |id, state| {
963 let mut options = options.unwrap_or_else(|| serde_json::json!({}));
964 if let Some(current_phase) = state.current_phase.clone() {
965 if options.get("phase").is_none() {
966 options["phase"] = serde_json::Value::String(current_phase);
967 }
968 }
969 let options = if options.as_object().is_some_and(|object| object.is_empty())
970 {
971 None
972 } else {
973 Some(options)
974 };
975 WorkflowRuntimeRequest::Agent {
976 id,
977 prompt,
978 options,
979 }
980 })
981 },
982 )))
983 .enumerable()
984 .configurable(),
985 )
986 .context("failed to install workflow agent global")?;
987
988 let workflow_state = Rc::clone(&state);
989 globals
990 .prop(
991 "workflow",
992 Property::from(Func::from(MutFn::from(
993 move |ctx: rquickjs::Ctx<'js>, workflow_ref: Value<'js>, args: Opt<Value<'js>>| {
994 let workflow_ref = rquickjs_serde::from_value::<super::WorkflowRef>(
995 workflow_ref,
996 )
997 .map_err(|error| rquickjs::Error::FromJs {
998 from: "value",
999 to: "WorkflowRef",
1000 message: Some(error.to_string()),
1001 })?;
1002 let args = match args.0 {
1003 Some(value) => Some(
1004 rquickjs_serde::from_value::<serde_json::Value>(value).map_err(
1005 |error| rquickjs::Error::FromJs {
1006 from: "value",
1007 to: "serde_json::Value",
1008 message: Some(error.to_string()),
1009 },
1010 )?,
1011 ),
1012 None => None,
1013 };
1014 create_pending_request(&ctx, &workflow_state, |id, _state| {
1015 WorkflowRuntimeRequest::Workflow {
1016 id,
1017 workflow_ref,
1018 args,
1019 }
1020 })
1021 },
1022 )))
1023 .enumerable()
1024 .configurable(),
1025 )
1026 .context("failed to install workflow child workflow global")?;
1027
1028 let sw = create_sw_object(globals.ctx(), state)?;
1029 globals
1030 .prop("SW", Property::from(sw).enumerable().configurable())
1031 .context("failed to install workflow SW global")?;
1032
1033 Ok(())
1034}
1035
1036fn create_sw_object<'js>(
1037 ctx: &rquickjs::Ctx<'js>,
1038 state: Rc<RefCell<RuntimeState>>,
1039) -> anyhow::Result<Object<'js>> {
1040 let sw = Object::new(ctx.clone()).context("failed to create workflow SW object")?;
1041 let extra = create_extra_object(ctx, state)?;
1042 sw.prop("extra", Property::from(extra).enumerable())
1043 .context("failed to install workflow SW.extra object")?;
1044 Ok(sw)
1045}
1046
1047fn create_extra_object<'js>(
1048 ctx: &rquickjs::Ctx<'js>,
1049 state: Rc<RefCell<RuntimeState>>,
1050) -> anyhow::Result<Object<'js>> {
1051 let extra = Object::new(ctx.clone()).context("failed to create workflow extra object")?;
1052 let sleep_state = Rc::clone(&state);
1053 extra
1054 .prop(
1055 "sleep",
1056 Property::from(Func::from(MutFn::from(
1057 move |ctx: rquickjs::Ctx<'js>, duration: Value<'js>| {
1058 let max_sleep_ms = sleep_state.borrow().max_sleep_ms;
1059 let duration_ms = validate_sleep_duration(&ctx, &duration, max_sleep_ms)?;
1060 create_pending_request(&ctx, &sleep_state, |id, _state| {
1061 WorkflowRuntimeRequest::Sleep { id, duration_ms }
1062 })
1063 },
1064 )))
1065 .enumerable(),
1066 )
1067 .context("failed to install workflow extra sleep function")?;
1068 Ok(extra)
1069}
1070
1071fn validate_sleep_duration<'js>(
1072 ctx: &rquickjs::Ctx<'js>,
1073 value: &Value<'js>,
1074 max_sleep_ms: u64,
1075) -> rquickjs::Result<u64> {
1076 let Some(number) = value.as_number() else {
1077 return Err(Exception::throw_message(
1078 ctx,
1079 "sleep(ms) requires a finite non-negative number",
1080 ));
1081 };
1082 if !number.is_finite() || number < 0.0 {
1083 return Err(Exception::throw_message(
1084 ctx,
1085 "sleep(ms) requires a finite non-negative number",
1086 ));
1087 }
1088 let duration_ms = number.ceil();
1089 if duration_ms > max_sleep_ms as f64 {
1090 return Err(Exception::throw_message(
1091 ctx,
1092 "sleep(ms) duration exceeds the maximum allowed delay",
1093 ));
1094 }
1095 Ok(duration_ms as u64)
1096}
1097
1098fn create_pending_request<'js>(
1099 ctx: &rquickjs::Ctx<'js>,
1100 state: &Rc<RefCell<RuntimeState>>,
1101 make_request: impl FnOnce(String, &mut RuntimeState) -> WorkflowRuntimeRequest,
1102) -> rquickjs::Result<Promise<'js>> {
1103 let (promise, resolve, reject) = ctx.promise()?;
1104 let mut state = state.borrow_mut();
1105 state.next_request_id += 1;
1106 let id = state.next_request_id.to_string();
1107 let request = make_request(id.clone(), &mut state);
1108 log::debug!("quickjs queued request id={} kind={}", id, request.kind());
1109 state.pending_requests.insert(
1110 id,
1111 PendingRequest {
1112 resolve: Persistent::save(ctx, resolve),
1113 reject: Persistent::save(ctx, reject),
1114 },
1115 );
1116 state.requests.push_back(request);
1117 Ok(promise)
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122 use super::*;
1123 use crate::js_runtime::{WorkflowModuleInput, WorkflowRuntimePoll};
1124 use serde_json::json;
1125
1126 #[test]
1127 fn executes_default_export_object() {
1128 let mut execution = RQuickJSWorkflowRuntime::new()
1129 .start_module(WorkflowModuleInput::new(
1130 r#"
1131export const meta = { name: "inline", description: "inline" };
1132export default { ok: true, args };
1133"#,
1134 "inline.workflow.js",
1135 json!({ "value": 1 }),
1136 ))
1137 .expect("workflow should start");
1138
1139 let output = loop {
1140 match execution.poll().expect("workflow should poll") {
1141 WorkflowRuntimePoll::Complete(output) => break output,
1142 WorkflowRuntimePoll::Pending => continue,
1143 other => panic!("expected completion, got {other:?}"),
1144 }
1145 };
1146
1147 assert_eq!(output.result, json!({ "ok": true, "args": { "value": 1 } }));
1148 }
1149}