1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 fmt,
5 rc::Rc,
6 sync::{
7 Arc, Condvar, Mutex as StdMutex,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::{Duration, Instant},
11};
12
13use deno_ast::{MediaType, ParseParams, SourceMapOption};
14use deno_core::{Extension, JsRuntime, OpState, PollEventLoopOptions, RuntimeOptions, op2, v8};
15use deno_error::JsErrorBox;
16use langshell_core::{
17 CallStatus, ErrorObject, ExternalCallRecord, Language, LanguageRuntime, Metrics, RunRequest,
18 RunResult, RunStatus, RuntimeFuture, SessionId, SessionLimits, ToolCallContext, ToolRegistry,
19 digest_bytes, digest_json,
20};
21use serde::{Deserialize, Serialize};
22use serde_json::{Map, Value, json};
23use tokio::sync::{mpsc, oneshot};
24
25pub const DENO_SNAPSHOT_MAGIC: &str = "langshell-deno-snapshot/v1";
26
27#[derive(Clone)]
28pub struct DenoRuntime {
29 tx: mpsc::UnboundedSender<DenoCommand>,
30}
31
32impl fmt::Debug for DenoRuntime {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 f.debug_struct("DenoRuntime").finish_non_exhaustive()
35 }
36}
37
38impl DenoRuntime {
39 pub fn new(registry: ToolRegistry, default_limits: SessionLimits) -> Self {
40 let (tx, rx) = mpsc::unbounded_channel();
41 std::thread::Builder::new()
42 .name("langshell-deno".to_owned())
43 .spawn(move || run_worker_thread(rx, registry, default_limits))
44 .expect("failed to spawn langshell-deno worker thread");
45 Self { tx }
46 }
47
48 pub async fn create_session(
49 &self,
50 session_id: SessionId,
51 limits: Option<SessionLimits>,
52 ) -> Result<(), ErrorObject> {
53 let (reply, rx) = oneshot::channel();
54 self.send(DenoCommand::CreateSession {
55 session_id,
56 limits,
57 reply,
58 })?;
59 rx.await.unwrap_or_else(|_| Err(worker_closed_error()))
60 }
61
62 pub async fn run(&self, request: RunRequest) -> RunResult {
63 let (reply, rx) = oneshot::channel();
64 if let Err(error) = self.send(DenoCommand::Run { request, reply }) {
65 return runtime_error_result(error);
66 }
67 rx.await
68 .unwrap_or_else(|_| runtime_error_result(worker_closed_error()))
69 }
70
71 pub async fn destroy_session(&self, session_id: &SessionId) -> Result<bool, ErrorObject> {
72 let (reply, rx) = oneshot::channel();
73 self.send(DenoCommand::DestroySession {
74 session_id: session_id.clone(),
75 reply,
76 })?;
77 rx.await.map_err(|_| worker_closed_error())
78 }
79
80 pub async fn list_sessions(&self) -> Result<Vec<SessionId>, ErrorObject> {
81 let (reply, rx) = oneshot::channel();
82 self.send(DenoCommand::ListSessions { reply })?;
83 rx.await.map_err(|_| worker_closed_error())
84 }
85
86 pub async fn snapshot_session(&self, session_id: &SessionId) -> Result<Vec<u8>, ErrorObject> {
87 let (reply, rx) = oneshot::channel();
88 self.send(DenoCommand::SnapshotSession {
89 session_id: session_id.clone(),
90 reply,
91 })?;
92 rx.await.unwrap_or_else(|_| Err(worker_closed_error()))
93 }
94
95 pub async fn restore_session(
96 &self,
97 snapshot: &[u8],
98 session_id: Option<SessionId>,
99 ) -> Result<SessionId, ErrorObject> {
100 let (reply, rx) = oneshot::channel();
101 self.send(DenoCommand::RestoreSession {
102 snapshot: snapshot.to_vec(),
103 session_id,
104 reply,
105 })?;
106 rx.await.unwrap_or_else(|_| Err(worker_closed_error()))
107 }
108
109 fn send(&self, command: DenoCommand) -> Result<(), ErrorObject> {
110 self.tx.send(command).map_err(|_| worker_closed_error())
111 }
112}
113
114impl LanguageRuntime for DenoRuntime {
115 fn language(&self) -> Language {
116 Language::TypeScript
117 }
118
119 fn create_session(
120 &self,
121 session_id: SessionId,
122 limits: Option<SessionLimits>,
123 ) -> RuntimeFuture<'_, Result<(), ErrorObject>> {
124 Box::pin(async move { DenoRuntime::create_session(self, session_id, limits).await })
125 }
126
127 fn run(&self, request: RunRequest) -> RuntimeFuture<'_, RunResult> {
128 Box::pin(async move { DenoRuntime::run(self, request).await })
129 }
130
131 fn destroy_session(
132 &self,
133 session_id: SessionId,
134 ) -> RuntimeFuture<'_, Result<bool, ErrorObject>> {
135 Box::pin(async move { DenoRuntime::destroy_session(self, &session_id).await })
136 }
137
138 fn list_sessions(&self) -> RuntimeFuture<'_, Result<Vec<SessionId>, ErrorObject>> {
139 Box::pin(async move { DenoRuntime::list_sessions(self).await })
140 }
141
142 fn snapshot_session(
143 &self,
144 session_id: SessionId,
145 ) -> RuntimeFuture<'_, Result<Vec<u8>, ErrorObject>> {
146 Box::pin(async move { DenoRuntime::snapshot_session(self, &session_id).await })
147 }
148
149 fn restore_session(
150 &self,
151 snapshot: Vec<u8>,
152 session_id: Option<SessionId>,
153 ) -> RuntimeFuture<'_, Result<SessionId, ErrorObject>> {
154 Box::pin(async move { DenoRuntime::restore_session(self, &snapshot, session_id).await })
155 }
156
157 fn can_restore_snapshot(&self, snapshot: &[u8]) -> bool {
158 is_deno_snapshot(snapshot)
159 }
160}
161
162pub fn is_deno_snapshot(snapshot: &[u8]) -> bool {
163 serde_json::from_slice::<serde_json::Value>(snapshot)
164 .ok()
165 .and_then(|value| {
166 value
167 .get("magic")
168 .and_then(Value::as_str)
169 .map(str::to_owned)
170 })
171 .as_deref()
172 == Some(DENO_SNAPSHOT_MAGIC)
173}
174
175enum DenoCommand {
176 CreateSession {
177 session_id: SessionId,
178 limits: Option<SessionLimits>,
179 reply: oneshot::Sender<Result<(), ErrorObject>>,
180 },
181 Run {
182 request: RunRequest,
183 reply: oneshot::Sender<RunResult>,
184 },
185 DestroySession {
186 session_id: SessionId,
187 reply: oneshot::Sender<bool>,
188 },
189 ListSessions {
190 reply: oneshot::Sender<Vec<SessionId>>,
191 },
192 SnapshotSession {
193 session_id: SessionId,
194 reply: oneshot::Sender<Result<Vec<u8>, ErrorObject>>,
195 },
196 RestoreSession {
197 snapshot: Vec<u8>,
198 session_id: Option<SessionId>,
199 reply: oneshot::Sender<Result<SessionId, ErrorObject>>,
200 },
201}
202
203fn run_worker_thread(
204 mut rx: mpsc::UnboundedReceiver<DenoCommand>,
205 registry: ToolRegistry,
206 default_limits: SessionLimits,
207) {
208 let runtime = tokio::runtime::Builder::new_current_thread()
209 .enable_all()
210 .build()
211 .expect("failed to build langshell-deno tokio runtime");
212 runtime.block_on(async move {
213 let mut worker = DenoWorker::new(registry, default_limits);
214 while let Some(command) = rx.recv().await {
215 worker.handle(command).await;
216 }
217 });
218}
219
220struct DenoWorker {
221 sessions: HashMap<String, DenoSession>,
222 registry: ToolRegistry,
223 default_limits: SessionLimits,
224}
225
226impl DenoWorker {
227 fn new(registry: ToolRegistry, default_limits: SessionLimits) -> Self {
228 Self {
229 sessions: HashMap::new(),
230 registry,
231 default_limits,
232 }
233 }
234
235 async fn handle(&mut self, command: DenoCommand) {
236 match command {
237 DenoCommand::CreateSession {
238 session_id,
239 limits,
240 reply,
241 } => {
242 let result = self.create_session(session_id, limits);
243 let _ = reply.send(result);
244 }
245 DenoCommand::Run { request, reply } => {
246 let result = self.run(request).await;
247 let _ = reply.send(result);
248 }
249 DenoCommand::DestroySession { session_id, reply } => {
250 let _ = reply.send(self.sessions.remove(&session_id.0).is_some());
251 }
252 DenoCommand::ListSessions { reply } => {
253 let mut ids: Vec<_> = self.sessions.keys().cloned().map(SessionId).collect();
254 ids.sort_by(|a, b| a.0.cmp(&b.0));
255 let _ = reply.send(ids);
256 }
257 DenoCommand::SnapshotSession { session_id, reply } => {
258 let result = self.snapshot_session(&session_id);
259 let _ = reply.send(result);
260 }
261 DenoCommand::RestoreSession {
262 snapshot,
263 session_id,
264 reply,
265 } => {
266 let result = self.restore_session(&snapshot, session_id);
267 let _ = reply.send(result);
268 }
269 }
270 }
271
272 fn create_session(
273 &mut self,
274 session_id: SessionId,
275 limits: Option<SessionLimits>,
276 ) -> Result<(), ErrorObject> {
277 if !self.sessions.contains_key(&session_id.0) {
278 let limits = limits.unwrap_or_else(|| self.default_limits.clone());
279 let session = DenoSession::new(session_id.clone(), limits, &self.registry)?;
280 self.sessions.insert(session_id.0, session);
281 }
282 Ok(())
283 }
284
285 async fn run(&mut self, request: RunRequest) -> RunResult {
286 if request.language != Language::TypeScript {
287 return RunResult::error(
288 RunStatus::ValidationError,
289 ErrorObject::new(
290 "UNSUPPORTED_FEATURE",
291 "The Deno backend only executes TypeScript.",
292 ),
293 String::new(),
294 Metrics::default(),
295 );
296 }
297 if request.validate_only {
298 return validate_request(&request, &self.registry);
299 }
300
301 let limits = effective_limits(&self.default_limits, &request);
302 let session_id = request.session_id.clone();
303 let mut session = match self.sessions.remove(&session_id.0) {
304 Some(mut session) => {
305 session.limits = limits;
306 session
307 }
308 None => match DenoSession::new(session_id.clone(), limits, &self.registry) {
309 Ok(session) => session,
310 Err(error) => {
311 return RunResult::error(
312 RunStatus::RuntimeError,
313 error,
314 String::new(),
315 Metrics::default(),
316 );
317 }
318 },
319 };
320
321 let result = session.run(request, &self.registry).await;
322 self.sessions.insert(session_id.0, session);
323 result
324 }
325
326 fn snapshot_session(&mut self, session_id: &SessionId) -> Result<Vec<u8>, ErrorObject> {
327 let session = self.sessions.get_mut(&session_id.0).ok_or_else(|| {
328 ErrorObject::new(
329 "SESSION_NOT_FOUND",
330 format!("TypeScript session {} does not exist.", session_id.0),
331 )
332 })?;
333 let snapshot = SnapshotEnvelope {
334 magic: DENO_SNAPSHOT_MAGIC.to_owned(),
335 version: langshell_core::SNAPSHOT_VERSION,
336 session_id: session_id.0.clone(),
337 limits: session.limits.clone(),
338 globals: session.snapshot_globals()?,
339 capability_digest: capability_digest(&self.registry),
340 };
341 serde_json::to_vec(&snapshot).map_err(|err| {
342 ErrorObject::new(
343 "SNAPSHOT_CORRUPT",
344 format!("Failed to serialize Deno snapshot: {err}"),
345 )
346 })
347 }
348
349 fn restore_session(
350 &mut self,
351 snapshot: &[u8],
352 session_id: Option<SessionId>,
353 ) -> Result<SessionId, ErrorObject> {
354 let snapshot: SnapshotEnvelope = serde_json::from_slice(snapshot).map_err(|err| {
355 ErrorObject::new("SNAPSHOT_CORRUPT", format!("Invalid Deno snapshot: {err}"))
356 })?;
357 if snapshot.magic != DENO_SNAPSHOT_MAGIC {
358 return Err(ErrorObject::new(
359 "SNAPSHOT_CORRUPT",
360 "Deno snapshot magic mismatch.",
361 ));
362 }
363 if snapshot.version != langshell_core::SNAPSHOT_VERSION {
364 return Err(ErrorObject::new(
365 "SNAPSHOT_VERSION_MISMATCH",
366 format!("Snapshot version {} is not supported.", snapshot.version),
367 ));
368 }
369 if snapshot.capability_digest != capability_digest(&self.registry) {
370 return Err(ErrorObject::new(
371 "SNAPSHOT_CAPABILITY_MISMATCH",
372 "Snapshot was created with a different capability set.",
373 ));
374 }
375
376 let id = session_id.unwrap_or(SessionId(snapshot.session_id));
377 let mut session = DenoSession::new(id.clone(), snapshot.limits, &self.registry)?;
378 session.restore_globals(snapshot.globals)?;
379 self.sessions.insert(id.0.clone(), session);
380 Ok(id)
381 }
382}
383
384struct DenoSession {
385 id: SessionId,
386 limits: SessionLimits,
387 runtime: JsRuntime,
388}
389
390impl DenoSession {
391 fn new(
392 id: SessionId,
393 limits: SessionLimits,
394 registry: &ToolRegistry,
395 ) -> Result<Self, ErrorObject> {
396 let create_params = v8::Isolate::create_params().heap_limits(
397 0,
398 usize::try_from(limits.memory_mb).unwrap_or(usize::MAX / 1024 / 1024) * 1024 * 1024,
399 );
400 let mut runtime = JsRuntime::try_new(RuntimeOptions {
401 extensions: vec![langshell_extension(registry.clone(), limits.clone())],
402 create_params: Some(create_params),
403 ..Default::default()
404 })
405 .map_err(|err| ErrorObject::new("RUNTIME_ERROR", err.to_string()))?;
406
407 install_tool_globals(&mut runtime, registry)?;
408 Ok(Self {
409 id,
410 limits,
411 runtime,
412 })
413 }
414
415 async fn run(&mut self, request: RunRequest, registry: &ToolRegistry) -> RunResult {
416 let started = Instant::now();
417 if let Some(error) = static_validation_error(&request.code, registry) {
418 return RunResult::error(
419 code_to_status(&error.code, true),
420 error,
421 String::new(),
422 metrics(started, 0),
423 );
424 }
425
426 let js = match transpile_typescript(&request.code) {
427 Ok(js) => js,
428 Err(error) => {
429 return RunResult::error(
430 code_to_status(&error.code, true),
431 error,
432 String::new(),
433 metrics(started, 0),
434 );
435 }
436 };
437
438 self.reset_run_state(registry);
439 if let Err(error) = self.inject_inputs(&request.inputs) {
440 return RunResult::error(
441 RunStatus::ValidationError,
442 error,
443 String::new(),
444 metrics(started, 0),
445 );
446 }
447
448 let wrapped = wrap_user_code(&js);
449 let run_error = self
450 .execute_user_code(
451 &wrapped,
452 Duration::from_millis(u64::from(request.timeout_ms.unwrap_or(self.limits.wall_ms))),
453 )
454 .await
455 .err();
456 let state = self.run_state_snapshot();
457 if let Some(error) = run_error.or(state.error.clone()) {
458 let mut result = RunResult::error(
459 code_to_status(&error.code, false),
460 error,
461 truncate_stdout(state.stdout, &self.limits),
462 metrics(started, state.records.len() as u32),
463 );
464 result.stderr = truncate_stdout(state.stderr, &self.limits);
465 result.external_calls = state.records;
466 return result;
467 }
468
469 let result_value = match self.read_result() {
470 Ok(value) => value,
471 Err(error) => {
472 let mut result = RunResult::error(
473 RunStatus::ValidationError,
474 error,
475 truncate_stdout(state.stdout, &self.limits),
476 metrics(started, state.records.len() as u32),
477 );
478 result.stderr = truncate_stdout(state.stderr, &self.limits);
479 result.external_calls = state.records;
480 return result;
481 }
482 };
483
484 let mut result = RunResult::ok(
485 result_value,
486 truncate_stdout(state.stdout, &self.limits),
487 metrics(started, state.records.len() as u32),
488 );
489 result.stderr = truncate_stdout(state.stderr, &self.limits);
490 result.external_calls = state.records;
491 if request.return_snapshot {
492 result.snapshot_id = Some(format!("snap_{}", digest_bytes(self.id.0.as_bytes())));
493 }
494 result
495 }
496
497 fn reset_run_state(&mut self, registry: &ToolRegistry) {
498 let op_state = self.runtime.op_state();
499 let mut state = op_state.borrow_mut();
500 let data = state.borrow_mut::<DenoOpState>();
501 data.registry = registry.clone();
502 data.limits = self.limits.clone();
503 data.stdout.clear();
504 data.stderr.clear();
505 data.records.clear();
506 data.started_calls = 0;
507 data.error = None;
508 }
509
510 fn run_state_snapshot(&self) -> RunStateSnapshot {
511 let op_state = self.runtime.op_state();
512 let state = op_state.borrow();
513 let data = state.borrow::<DenoOpState>();
514 RunStateSnapshot {
515 stdout: data.stdout.clone(),
516 stderr: data.stderr.clone(),
517 records: data.records.clone(),
518 error: data.error.clone(),
519 }
520 }
521
522 fn inject_inputs(&mut self, inputs: &Map<String, Value>) -> Result<(), ErrorObject> {
523 if inputs.is_empty() {
524 return Ok(());
525 }
526 let inputs = serde_json::to_string(inputs).map_err(|err| {
527 ErrorObject::new("INVALID_ARGUMENT", format!("inputs are not JSON: {err}"))
528 })?;
529 execute_script_unit(
530 &mut self.runtime,
531 "<langshell-inputs>",
532 format!("globalThis.__langshell_restore_globals({inputs});"),
533 )
534 }
535
536 async fn execute_user_code(
537 &mut self,
538 code: &str,
539 timeout: Duration,
540 ) -> Result<(), ErrorObject> {
541 let timed_out = Arc::new(AtomicBool::new(false));
542 let timer_done = Arc::new((StdMutex::new(false), Condvar::new()));
543 let handle = self.runtime.v8_isolate().thread_safe_handle();
544 let timer_timed_out = timed_out.clone();
545 let timer_done_thread = timer_done.clone();
546 let timer = std::thread::spawn(move || {
547 let (lock, cvar) = &*timer_done_thread;
548 let done = lock.lock().expect("timer mutex poisoned");
549 let (done, wait) = cvar
550 .wait_timeout_while(done, timeout, |done| !*done)
551 .expect("timer condvar poisoned");
552 if !*done && wait.timed_out() {
553 timer_timed_out.store(true, Ordering::SeqCst);
554 handle.terminate_execution();
555 }
556 });
557
558 let value = match self
559 .runtime
560 .execute_script("<langshell-run>", code.to_owned())
561 {
562 Ok(value) => value,
563 Err(err) => {
564 stop_timer(timer_done, timer);
565 if timed_out.load(Ordering::SeqCst) {
566 self.runtime.v8_isolate().cancel_terminate_execution();
567 return Err(timeout_error());
568 }
569 return Err(error_from_js(err.to_string(), false));
570 }
571 };
572
573 let resolve = self.runtime.resolve(value);
574 let resolved = tokio::time::timeout(
575 timeout,
576 self.runtime
577 .with_event_loop_promise(resolve, PollEventLoopOptions::default()),
578 )
579 .await;
580 stop_timer(timer_done, timer);
581 if timed_out.load(Ordering::SeqCst) {
582 self.runtime.v8_isolate().cancel_terminate_execution();
583 return Err(timeout_error());
584 }
585 match resolved {
586 Ok(Ok(_)) => Ok(()),
587 Ok(Err(err)) => Err(error_from_js(err.to_string(), false)),
588 Err(_) => {
589 self.runtime.v8_isolate().terminate_execution();
590 self.runtime.v8_isolate().cancel_terminate_execution();
591 Err(timeout_error())
592 }
593 }
594 }
595
596 fn read_result(&mut self) -> Result<Option<Value>, ErrorObject> {
597 let value = self
598 .runtime
599 .execute_script("<langshell-result>", "globalThis.result")
600 .map_err(|err| error_from_js(err.to_string(), false))?;
601 v8_to_json(&mut self.runtime, value)
602 }
603
604 fn snapshot_globals(&mut self) -> Result<Value, ErrorObject> {
605 let value = self
606 .runtime
607 .execute_script(
608 "<langshell-snapshot>",
609 "JSON.stringify(globalThis.__langshell_snapshot_globals())",
610 )
611 .map_err(|err| ErrorObject::new("SNAPSHOT_CORRUPT", err.to_string()))?;
612 let globals = v8_to_string(&mut self.runtime, value)?;
613 serde_json::from_str(&globals).map_err(|err| {
614 ErrorObject::new(
615 "SNAPSHOT_CORRUPT",
616 format!("Deno snapshot globals did not produce valid JSON: {err}"),
617 )
618 })
619 }
620
621 fn restore_globals(&mut self, globals: Value) -> Result<(), ErrorObject> {
622 let globals = serde_json::to_string(&globals).map_err(|err| {
623 ErrorObject::new("SNAPSHOT_CORRUPT", format!("Invalid globals: {err}"))
624 })?;
625 execute_script_unit(
626 &mut self.runtime,
627 "<langshell-restore>",
628 format!("globalThis.__langshell_restore_globals({globals});"),
629 )
630 .map_err(|err| ErrorObject::new("SNAPSHOT_CORRUPT", err.message))
631 }
632}
633
634fn stop_timer(timer_done: Arc<(StdMutex<bool>, Condvar)>, timer: std::thread::JoinHandle<()>) {
635 let (lock, cvar) = &*timer_done;
636 if let Ok(mut done) = lock.lock() {
637 *done = true;
638 cvar.notify_one();
639 }
640 let _ = timer.join();
641}
642
643#[derive(Debug, Serialize, Deserialize)]
644struct SnapshotEnvelope {
645 magic: String,
646 version: u32,
647 session_id: String,
648 limits: SessionLimits,
649 globals: Value,
650 capability_digest: String,
651}
652
653struct DenoOpState {
654 registry: ToolRegistry,
655 limits: SessionLimits,
656 stdout: String,
657 stderr: String,
658 records: Vec<ExternalCallRecord>,
659 started_calls: u32,
660 error: Option<ErrorObject>,
661}
662
663impl DenoOpState {
664 fn new(registry: ToolRegistry, limits: SessionLimits) -> Self {
665 Self {
666 registry,
667 limits,
668 stdout: String::new(),
669 stderr: String::new(),
670 records: Vec::new(),
671 started_calls: 0,
672 error: None,
673 }
674 }
675}
676
677struct RunStateSnapshot {
678 stdout: String,
679 stderr: String,
680 records: Vec<ExternalCallRecord>,
681 error: Option<ErrorObject>,
682}
683
684#[op2(fast)]
685pub fn op_print(state: &mut OpState, #[string] msg: &str, is_err: bool) {
686 let data = state.borrow_mut::<DenoOpState>();
687 if is_err {
688 data.stderr.push_str(msg);
689 } else {
690 data.stdout.push_str(msg);
691 }
692}
693
694#[op2]
695#[serde]
696fn op_langshell_call_tool_sync(
697 state: &mut OpState,
698 #[string] name: String,
699 #[serde] args: Vec<serde_json::Value>,
700 #[serde] kwargs: serde_json::Map<String, serde_json::Value>,
701) -> Result<serde_json::Value, JsErrorBox> {
702 let (tool, ctx) = prepare_tool_call(state, name, args, kwargs)?;
703 if tool.async_mode {
704 let error = ErrorObject::new(
705 "TYPE_ERROR",
706 format!(
707 "Tool {} is asynchronous; call it with await.",
708 tool.capability.name
709 ),
710 );
711 state.borrow_mut::<DenoOpState>().error = Some(error.clone());
712 return Err(error_object_to_js(error));
713 }
714 let outcome = futures::executor::block_on(run_tool(tool, ctx));
715 finish_tool_call(state, outcome)
716}
717
718#[op2(async(lazy))]
719#[serde]
720async fn op_langshell_call_tool_async(
721 state: Rc<RefCell<OpState>>,
722 #[string] name: String,
723 #[serde] args: Vec<serde_json::Value>,
724 #[serde] kwargs: serde_json::Map<String, serde_json::Value>,
725) -> Result<serde_json::Value, JsErrorBox> {
726 let (tool, ctx) = {
727 let mut state = state.borrow_mut();
728 prepare_tool_call(&mut state, name, args, kwargs)?
729 };
730 let outcome = run_tool(tool, ctx).await;
731 let mut state = state.borrow_mut();
732 finish_tool_call(&mut state, outcome)
733}
734
735fn langshell_extension(registry: ToolRegistry, limits: SessionLimits) -> Extension {
736 Extension {
737 name: "langshell_deno",
738 ops: std::borrow::Cow::Owned(vec![
739 op_langshell_call_tool_sync(),
740 op_langshell_call_tool_async(),
741 ]),
742 middleware_fn: Some(Box::new(|op| match op.name {
743 "op_print" => op_print(),
744 _ => op,
745 })),
746 op_state_fn: Some(Box::new(move |state| {
747 state.put(DenoOpState::new(registry, limits));
748 })),
749 ..Default::default()
750 }
751}
752
753fn install_tool_globals(
754 runtime: &mut JsRuntime,
755 registry: &ToolRegistry,
756) -> Result<(), ErrorObject> {
757 let tools: Vec<_> = registry
758 .names()
759 .into_iter()
760 .filter_map(|name| {
761 registry.get(&name).map(|tool| {
762 json!({
763 "name": name,
764 "asyncMode": tool.async_mode,
765 })
766 })
767 })
768 .collect();
769 let tools = serde_json::to_string(&tools)
770 .map_err(|err| ErrorObject::new("SERIALIZE_ERROR", format!("tool metadata: {err}")))?;
771 let script = format!(
772 r#"
773const __langshellToolDefs = {tools};
774const __langshellOps = Deno.core.ops;
775Object.defineProperty(globalThis, "__langshell_ops", {{ value: __langshellOps, configurable: false }});
776Object.defineProperty(globalThis, "LangShell", {{
777 value: Object.freeze({{
778 callTool: (name, args = [], kwargs = {{}}) => __langshellOps.op_langshell_call_tool_async(name, args, kwargs),
779 callToolSync: (name, args = [], kwargs = {{}}) => __langshellOps.op_langshell_call_tool_sync(name, args, kwargs),
780 }}),
781 configurable: true,
782}});
783for (const tool of __langshellToolDefs) {{
784 const call = tool.asyncMode
785 ? (...args) => __langshellOps.op_langshell_call_tool_async(tool.name, args, {{}})
786 : (...args) => __langshellOps.op_langshell_call_tool_sync(tool.name, args, {{}});
787 Object.defineProperty(globalThis, tool.name, {{ value: call, writable: false, configurable: true }});
788}}
789Object.defineProperty(globalThis, "__langshell_restore_globals", {{
790 value: (globals) => {{
791 for (const [key, value] of Object.entries(globals ?? {{}})) {{
792 Object.defineProperty(globalThis, key, {{ value, writable: true, configurable: true }});
793 }}
794 }},
795 configurable: false,
796}});
797Object.defineProperty(globalThis, "__langshell_snapshot_globals", {{
798 value: () => {{
799 const out = {{}};
800 for (const key of Object.getOwnPropertyNames(globalThis)) {{
801 if (globalThis.__langshell_baseline.has(key) || key.startsWith("__langshell")) continue;
802 const value = globalThis[key];
803 if (typeof value === "function" || typeof value === "symbol" || typeof value === "undefined" || typeof value === "bigint") continue;
804 try {{
805 JSON.stringify(value);
806 out[key] = value;
807 }} catch (_) {{}}
808 }}
809 return out;
810 }},
811 configurable: false,
812}});
813Object.defineProperty(globalThis, "__langshell_baseline", {{
814 value: new Set(Object.getOwnPropertyNames(globalThis)),
815 configurable: false,
816}});
817try {{
818 Object.defineProperty(globalThis, "Deno", {{ value: undefined, writable: false, configurable: true }});
819}} catch (_) {{}}
820"#
821 );
822 execute_script_unit(runtime, "<langshell-bootstrap>", script)
823}
824
825fn prepare_tool_call(
826 state: &mut OpState,
827 name: String,
828 args: Vec<Value>,
829 kwargs: Map<String, Value>,
830) -> Result<(langshell_core::RegisteredTool, ToolCallContext), JsErrorBox> {
831 let data = state.borrow_mut::<DenoOpState>();
832 data.started_calls = data.started_calls.saturating_add(1);
833 if data.started_calls > data.limits.max_external_calls {
834 let error = ErrorObject::new(
835 "EXTERNAL_CALLS_EXCEEDED",
836 format!(
837 "External call limit {} exceeded.",
838 data.limits.max_external_calls
839 ),
840 );
841 data.error = Some(error.clone());
842 return Err(error_object_to_js(error));
843 }
844 let Some(tool) = data.registry.get(&name).cloned() else {
845 let error = ErrorObject::new(
846 "UNKNOWN_TOOL",
847 format!("Function {name} is not registered."),
848 )
849 .with_hint("Call list_tools() or describe_tool() to inspect registered functions.");
850 data.error = Some(error.clone());
851 return Err(error_object_to_js(error));
852 };
853 Ok((tool, ToolCallContext { name, args, kwargs }))
854}
855
856async fn run_tool(
857 tool: langshell_core::RegisteredTool,
858 ctx: ToolCallContext,
859) -> (Result<Value, ErrorObject>, ExternalCallRecord) {
860 let started = Instant::now();
861 let request_digest = digest_json(&json!({"args": ctx.args, "kwargs": ctx.kwargs}));
862 let side_effect = tool.capability.side_effect;
863 let name = tool.capability.name.clone();
864 match tool.call(ctx).await {
865 Ok(value) => {
866 let response_digest = Some(digest_json(&value));
867 (
868 Ok(value),
869 ExternalCallRecord {
870 name,
871 side_effect,
872 duration_ms: elapsed_ms(started),
873 status: CallStatus::Ok,
874 request_digest,
875 response_digest,
876 error: None,
877 },
878 )
879 }
880 Err(error) => {
881 let error_object = ErrorObject::new(error.code, error.message);
882 (
883 Err(error_object.clone()),
884 ExternalCallRecord {
885 name,
886 side_effect,
887 duration_ms: elapsed_ms(started),
888 status: CallStatus::Error,
889 request_digest,
890 response_digest: None,
891 error: Some(error_object),
892 },
893 )
894 }
895 }
896}
897
898fn finish_tool_call(
899 state: &mut OpState,
900 outcome: (Result<Value, ErrorObject>, ExternalCallRecord),
901) -> Result<Value, JsErrorBox> {
902 let (result, record) = outcome;
903 let data = state.borrow_mut::<DenoOpState>();
904 data.records.push(record);
905 match result {
906 Ok(value) => Ok(value),
907 Err(error) => {
908 data.error = Some(error.clone());
909 Err(error_object_to_js(error))
910 }
911 }
912}
913
914fn validate_request(request: &RunRequest, registry: &ToolRegistry) -> RunResult {
915 let started = Instant::now();
916 if let Some(error) = static_validation_error(&request.code, registry) {
917 return RunResult::error(
918 code_to_status(&error.code, true),
919 error,
920 String::new(),
921 metrics(started, 0),
922 );
923 }
924 match transpile_typescript(&request.code) {
925 Ok(_) => RunResult::ok(None, String::new(), metrics(started, 0)),
926 Err(error) => RunResult::error(
927 code_to_status(&error.code, true),
928 error,
929 String::new(),
930 metrics(started, 0),
931 ),
932 }
933}
934
935fn transpile_typescript(code: &str) -> Result<String, ErrorObject> {
936 let specifier = deno_core::resolve_url("file:///langshell-run.ts")
937 .map_err(|err| ErrorObject::new("RUNTIME_ERROR", err.to_string()))?;
938 let parsed = deno_ast::parse_module(ParseParams {
939 specifier,
940 text: code.to_owned().into(),
941 media_type: MediaType::TypeScript,
942 capture_tokens: false,
943 scope_analysis: false,
944 maybe_syntax: None,
945 })
946 .map_err(|err| ErrorObject::new("SYNTAX_ERROR", err.to_string()))?;
947 let transpiled = parsed
948 .transpile(
949 &deno_ast::TranspileOptions {
950 imports_not_used_as_values: deno_ast::ImportsNotUsedAsValues::Remove,
951 decorators: deno_ast::DecoratorsTranspileOption::Ecma,
952 ..Default::default()
953 },
954 &deno_ast::TranspileModuleOptions { module_kind: None },
955 &deno_ast::EmitOptions {
956 source_map: SourceMapOption::None,
957 inline_sources: false,
958 ..Default::default()
959 },
960 )
961 .map_err(|err| ErrorObject::new("SYNTAX_ERROR", err.to_string()))?;
962 Ok(transpiled.into_source().text)
963}
964
965fn wrap_user_code(js: &str) -> String {
966 format!(
967 r#"
968(async () => {{
969 with (globalThis) {{
970{js}
971 }}
972}})()
973"#
974 )
975}
976
977fn v8_to_json(
978 runtime: &mut JsRuntime,
979 value: v8::Global<v8::Value>,
980) -> Result<Option<Value>, ErrorObject> {
981 deno_core::scope!(scope, runtime);
982 let local = v8::Local::new(scope, value);
983 if local.is_undefined() {
984 return Ok(None);
985 }
986 serde_v8::from_v8::<Value>(scope, local)
987 .map(Some)
988 .map_err(|err| {
989 ErrorObject::new(
990 "RESULT_NOT_SERIALIZABLE",
991 format!("TypeScript result could not be converted to JSON: {err}"),
992 )
993 .with_hint("Assign result to a JSON-compatible value before returning.")
994 })
995}
996
997fn v8_to_string(
998 runtime: &mut JsRuntime,
999 value: v8::Global<v8::Value>,
1000) -> Result<String, ErrorObject> {
1001 deno_core::scope!(scope, runtime);
1002 let local = v8::Local::new(scope, value);
1003 serde_v8::from_v8::<String>(scope, local).map_err(|err| {
1004 ErrorObject::new(
1005 "RESULT_NOT_SERIALIZABLE",
1006 format!("TypeScript value could not be converted to string: {err}"),
1007 )
1008 })
1009}
1010
1011fn execute_script_unit(
1012 runtime: &mut JsRuntime,
1013 name: &'static str,
1014 source: String,
1015) -> Result<(), ErrorObject> {
1016 runtime
1017 .execute_script(name, source)
1018 .map(|_| ())
1019 .map_err(|err| error_from_js(err.to_string(), false))
1020}
1021
1022fn static_validation_error(code: &str, registry: &ToolRegistry) -> Option<ErrorObject> {
1023 let unsupported = [
1024 "import ",
1025 "export ",
1026 "import(",
1027 "require(",
1028 "Deno.",
1029 "Deno[",
1030 "fetch(",
1031 "XMLHttpRequest",
1032 "WebSocket",
1033 "process.",
1034 "Bun.",
1035 "eval(",
1036 "Function(",
1037 ];
1038 unsupported
1039 .iter()
1040 .find(|pattern| code.contains(**pattern))
1041 .map(|pattern| {
1042 ErrorObject::new(
1043 "UNSUPPORTED_FEATURE",
1044 format!("Use of {pattern:?} is not supported in the LangShell Deno sandbox."),
1045 )
1046 .with_hint(
1047 "Use a registered capability such as read_text, fetch_json, or list_tools instead.",
1048 )
1049 })
1050 .or_else(|| {
1051 let suspicious = ["fetch_url", "query_db", "send_email"];
1052 suspicious
1053 .iter()
1054 .find(|name| code.contains(&format!("{name}(")) && !registry.contains(name))
1055 .map(|name| {
1056 ErrorObject::new(
1057 "UNKNOWN_TOOL",
1058 format!("Function {name} is not registered."),
1059 )
1060 .with_hint("Call list_tools() to inspect available capabilities.")
1061 })
1062 })
1063}
1064
1065fn effective_limits(default_limits: &SessionLimits, request: &RunRequest) -> SessionLimits {
1066 let mut limits = request
1067 .limits
1068 .clone()
1069 .unwrap_or_else(|| default_limits.clone());
1070 if let Some(timeout_ms) = request.timeout_ms {
1071 limits.wall_ms = timeout_ms;
1072 }
1073 limits
1074}
1075
1076fn code_to_status(code: &str, validation: bool) -> RunStatus {
1077 match code {
1078 "PERMISSION_DENIED" => RunStatus::PermissionDenied,
1079 "WAITING_FOR_APPROVAL" => RunStatus::WaitingForApproval,
1080 "TIMEOUT_WALL" | "TIMEOUT_CPU" | "TIMEOUT_TOOL" => RunStatus::Timeout,
1081 "CANCELLED" => RunStatus::Cancelled,
1082 "MEMORY_EXCEEDED" | "STDOUT_EXCEEDED" | "EXTERNAL_CALLS_EXCEEDED" | "STACK_OVERFLOW" => {
1083 RunStatus::ResourceExhausted
1084 }
1085 "SYNTAX_ERROR"
1086 | "TYPE_ERROR"
1087 | "UNKNOWN_TOOL"
1088 | "UNSUPPORTED_FEATURE"
1089 | "RESULT_NOT_SERIALIZABLE"
1090 | "SNAPSHOT_VERSION_MISMATCH"
1091 | "SNAPSHOT_CAPABILITY_MISMATCH"
1092 | "SNAPSHOT_CORRUPT" => RunStatus::ValidationError,
1093 _ if validation => RunStatus::ValidationError,
1094 _ => RunStatus::RuntimeError,
1095 }
1096}
1097
1098fn error_from_js(message: String, validation: bool) -> ErrorObject {
1099 let code = if validation || message.contains("SyntaxError") {
1100 "SYNTAX_ERROR"
1101 } else if message.contains("execution terminated") {
1102 "TIMEOUT_WALL"
1103 } else {
1104 "RUNTIME_ERROR"
1105 };
1106 ErrorObject::new(code, message)
1107}
1108
1109fn error_object_to_js(error: ErrorObject) -> JsErrorBox {
1110 JsErrorBox::generic(format!("{}: {}", error.code, error.message))
1111}
1112
1113fn timeout_error() -> ErrorObject {
1114 ErrorObject::new(
1115 "TIMEOUT_WALL",
1116 "TypeScript execution exceeded the wall-clock limit.",
1117 )
1118}
1119
1120fn runtime_error_result(error: ErrorObject) -> RunResult {
1121 RunResult::error(
1122 RunStatus::RuntimeError,
1123 error,
1124 String::new(),
1125 Metrics::default(),
1126 )
1127}
1128
1129fn worker_closed_error() -> ErrorObject {
1130 ErrorObject::new("RUNTIME_ERROR", "LangShell Deno worker is not available.")
1131}
1132
1133fn truncate_stdout(mut stdout: String, limits: &SessionLimits) -> String {
1134 let max = limits.max_stdout_bytes as usize;
1135 if stdout.len() > max {
1136 stdout.truncate(max);
1137 }
1138 stdout
1139}
1140
1141fn metrics(started: Instant, external_calls_count: u32) -> Metrics {
1142 Metrics {
1143 duration_ms: elapsed_ms(started),
1144 memory_peak_bytes: 0,
1145 instructions: 0,
1146 external_calls_count,
1147 }
1148}
1149
1150fn elapsed_ms(started: Instant) -> u32 {
1151 u32::try_from(started.elapsed().as_millis()).unwrap_or(u32::MAX)
1152}
1153
1154fn capability_digest(registry: &ToolRegistry) -> String {
1155 digest_json(&json!(registry.names()))
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use super::*;
1161 use langshell_core::{Capability, RegisteredTool, SideEffect};
1162
1163 #[tokio::test]
1164 async fn runs_typescript_and_reuses_state() {
1165 let runtime = DenoRuntime::new(ToolRegistry::new(), SessionLimits::default());
1166 let mut first = RunRequest::new("s1", "cache = { k: 1 }").unwrap();
1167 first.language = Language::TypeScript;
1168 assert_eq!(runtime.run(first).await.status, RunStatus::Ok);
1169
1170 let mut second = RunRequest::new("s1", "result = cache.k + 1").unwrap();
1171 second.language = Language::TypeScript;
1172 let result = runtime.run(second).await;
1173 assert_eq!(result.status, RunStatus::Ok, "{result:?}");
1174 assert_eq!(result.result, Some(json!(2)));
1175 }
1176
1177 #[tokio::test]
1178 async fn supports_async_external_function() {
1179 let mut registry = ToolRegistry::new();
1180 registry
1181 .register(RegisteredTool::asynchronous(
1182 Capability::new("fetch_json", "test fetch", SideEffect::Network),
1183 |ctx| {
1184 Box::pin(async move {
1185 Ok(json!({
1186 "url": ctx.args.first().and_then(Value::as_str).unwrap_or_default(),
1187 }))
1188 })
1189 },
1190 ))
1191 .unwrap();
1192 let runtime = DenoRuntime::new(registry, SessionLimits::default());
1193 let mut request = RunRequest::new(
1194 "s1",
1195 r#"
1196data = await fetch_json("https://api.example.com/item")
1197result = { url: data.url }
1198"#,
1199 )
1200 .unwrap();
1201 request.language = Language::TypeScript;
1202 let result = runtime.run(request).await;
1203 assert_eq!(result.status, RunStatus::Ok, "{result:?}");
1204 assert_eq!(
1205 result.result,
1206 Some(json!({"url": "https://api.example.com/item"}))
1207 );
1208 assert_eq!(result.metrics.external_calls_count, 1);
1209 }
1210
1211 #[tokio::test]
1212 async fn snapshots_json_globals() {
1213 let runtime = DenoRuntime::new(ToolRegistry::new(), SessionLimits::default());
1214 let mut first = RunRequest::new("s1", "state = { step: 1 }").unwrap();
1215 first.language = Language::TypeScript;
1216 assert_eq!(runtime.run(first).await.status, RunStatus::Ok);
1217
1218 let snapshot = runtime
1219 .snapshot_session(&SessionId("s1".to_owned()))
1220 .await
1221 .unwrap();
1222 runtime
1223 .restore_session(&snapshot, Some(SessionId("s2".to_owned())))
1224 .await
1225 .unwrap();
1226
1227 let mut second = RunRequest::new("s2", "result = state.step").unwrap();
1228 second.language = Language::TypeScript;
1229 let result = runtime.run(second).await;
1230 assert_eq!(result.status, RunStatus::Ok, "{result:?}");
1231 assert_eq!(result.result, Some(json!(1)));
1232 }
1233}