Skip to main content

langshell_deno/
lib.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    fmt,
5    rc::Rc,
6    sync::{
7        Arc, Condvar, Mutex as StdMutex,
8        atomic::{AtomicBool, Ordering},
9    },
10    time::{Duration, Instant},
11};
12
13use deno_ast::{MediaType, ParseParams, SourceMapOption};
14use deno_core::{Extension, JsRuntime, OpState, PollEventLoopOptions, RuntimeOptions, op2, v8};
15use deno_error::JsErrorBox;
16use langshell_core::{
17    CallStatus, ErrorObject, ExternalCallRecord, Language, LanguageRuntime, Metrics, RunRequest,
18    RunResult, RunStatus, RuntimeFuture, SessionId, SessionLimits, ToolCallContext, ToolRegistry,
19    digest_bytes, digest_json,
20};
21use serde::{Deserialize, Serialize};
22use serde_json::{Map, Value, json};
23use tokio::sync::{mpsc, oneshot};
24
25pub const DENO_SNAPSHOT_MAGIC: &str = "langshell-deno-snapshot/v1";
26
27#[derive(Clone)]
28pub struct DenoRuntime {
29    tx: mpsc::UnboundedSender<DenoCommand>,
30}
31
32impl fmt::Debug for DenoRuntime {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.debug_struct("DenoRuntime").finish_non_exhaustive()
35    }
36}
37
38impl DenoRuntime {
39    pub fn new(registry: ToolRegistry, default_limits: SessionLimits) -> Self {
40        let (tx, rx) = mpsc::unbounded_channel();
41        std::thread::Builder::new()
42            .name("langshell-deno".to_owned())
43            .spawn(move || run_worker_thread(rx, registry, default_limits))
44            .expect("failed to spawn langshell-deno worker thread");
45        Self { tx }
46    }
47
48    pub async fn create_session(
49        &self,
50        session_id: SessionId,
51        limits: Option<SessionLimits>,
52    ) -> Result<(), ErrorObject> {
53        let (reply, rx) = oneshot::channel();
54        self.send(DenoCommand::CreateSession {
55            session_id,
56            limits,
57            reply,
58        })?;
59        rx.await.unwrap_or_else(|_| Err(worker_closed_error()))
60    }
61
62    pub async fn run(&self, request: RunRequest) -> RunResult {
63        let (reply, rx) = oneshot::channel();
64        if let Err(error) = self.send(DenoCommand::Run { request, reply }) {
65            return runtime_error_result(error);
66        }
67        rx.await
68            .unwrap_or_else(|_| runtime_error_result(worker_closed_error()))
69    }
70
71    pub async fn destroy_session(&self, session_id: &SessionId) -> Result<bool, ErrorObject> {
72        let (reply, rx) = oneshot::channel();
73        self.send(DenoCommand::DestroySession {
74            session_id: session_id.clone(),
75            reply,
76        })?;
77        rx.await.map_err(|_| worker_closed_error())
78    }
79
80    pub async fn list_sessions(&self) -> Result<Vec<SessionId>, ErrorObject> {
81        let (reply, rx) = oneshot::channel();
82        self.send(DenoCommand::ListSessions { reply })?;
83        rx.await.map_err(|_| worker_closed_error())
84    }
85
86    pub async fn snapshot_session(&self, session_id: &SessionId) -> Result<Vec<u8>, ErrorObject> {
87        let (reply, rx) = oneshot::channel();
88        self.send(DenoCommand::SnapshotSession {
89            session_id: session_id.clone(),
90            reply,
91        })?;
92        rx.await.unwrap_or_else(|_| Err(worker_closed_error()))
93    }
94
95    pub async fn restore_session(
96        &self,
97        snapshot: &[u8],
98        session_id: Option<SessionId>,
99    ) -> Result<SessionId, ErrorObject> {
100        let (reply, rx) = oneshot::channel();
101        self.send(DenoCommand::RestoreSession {
102            snapshot: snapshot.to_vec(),
103            session_id,
104            reply,
105        })?;
106        rx.await.unwrap_or_else(|_| Err(worker_closed_error()))
107    }
108
109    fn send(&self, command: DenoCommand) -> Result<(), ErrorObject> {
110        self.tx.send(command).map_err(|_| worker_closed_error())
111    }
112}
113
114impl LanguageRuntime for DenoRuntime {
115    fn language(&self) -> Language {
116        Language::TypeScript
117    }
118
119    fn create_session(
120        &self,
121        session_id: SessionId,
122        limits: Option<SessionLimits>,
123    ) -> RuntimeFuture<'_, Result<(), ErrorObject>> {
124        Box::pin(async move { DenoRuntime::create_session(self, session_id, limits).await })
125    }
126
127    fn run(&self, request: RunRequest) -> RuntimeFuture<'_, RunResult> {
128        Box::pin(async move { DenoRuntime::run(self, request).await })
129    }
130
131    fn destroy_session(
132        &self,
133        session_id: SessionId,
134    ) -> RuntimeFuture<'_, Result<bool, ErrorObject>> {
135        Box::pin(async move { DenoRuntime::destroy_session(self, &session_id).await })
136    }
137
138    fn list_sessions(&self) -> RuntimeFuture<'_, Result<Vec<SessionId>, ErrorObject>> {
139        Box::pin(async move { DenoRuntime::list_sessions(self).await })
140    }
141
142    fn snapshot_session(
143        &self,
144        session_id: SessionId,
145    ) -> RuntimeFuture<'_, Result<Vec<u8>, ErrorObject>> {
146        Box::pin(async move { DenoRuntime::snapshot_session(self, &session_id).await })
147    }
148
149    fn restore_session(
150        &self,
151        snapshot: Vec<u8>,
152        session_id: Option<SessionId>,
153    ) -> RuntimeFuture<'_, Result<SessionId, ErrorObject>> {
154        Box::pin(async move { DenoRuntime::restore_session(self, &snapshot, session_id).await })
155    }
156
157    fn can_restore_snapshot(&self, snapshot: &[u8]) -> bool {
158        is_deno_snapshot(snapshot)
159    }
160}
161
162pub fn is_deno_snapshot(snapshot: &[u8]) -> bool {
163    serde_json::from_slice::<serde_json::Value>(snapshot)
164        .ok()
165        .and_then(|value| {
166            value
167                .get("magic")
168                .and_then(Value::as_str)
169                .map(str::to_owned)
170        })
171        .as_deref()
172        == Some(DENO_SNAPSHOT_MAGIC)
173}
174
175enum DenoCommand {
176    CreateSession {
177        session_id: SessionId,
178        limits: Option<SessionLimits>,
179        reply: oneshot::Sender<Result<(), ErrorObject>>,
180    },
181    Run {
182        request: RunRequest,
183        reply: oneshot::Sender<RunResult>,
184    },
185    DestroySession {
186        session_id: SessionId,
187        reply: oneshot::Sender<bool>,
188    },
189    ListSessions {
190        reply: oneshot::Sender<Vec<SessionId>>,
191    },
192    SnapshotSession {
193        session_id: SessionId,
194        reply: oneshot::Sender<Result<Vec<u8>, ErrorObject>>,
195    },
196    RestoreSession {
197        snapshot: Vec<u8>,
198        session_id: Option<SessionId>,
199        reply: oneshot::Sender<Result<SessionId, ErrorObject>>,
200    },
201}
202
203fn run_worker_thread(
204    mut rx: mpsc::UnboundedReceiver<DenoCommand>,
205    registry: ToolRegistry,
206    default_limits: SessionLimits,
207) {
208    let runtime = tokio::runtime::Builder::new_current_thread()
209        .enable_all()
210        .build()
211        .expect("failed to build langshell-deno tokio runtime");
212    runtime.block_on(async move {
213        let mut worker = DenoWorker::new(registry, default_limits);
214        while let Some(command) = rx.recv().await {
215            worker.handle(command).await;
216        }
217    });
218}
219
220struct DenoWorker {
221    sessions: HashMap<String, DenoSession>,
222    registry: ToolRegistry,
223    default_limits: SessionLimits,
224}
225
226impl DenoWorker {
227    fn new(registry: ToolRegistry, default_limits: SessionLimits) -> Self {
228        Self {
229            sessions: HashMap::new(),
230            registry,
231            default_limits,
232        }
233    }
234
235    async fn handle(&mut self, command: DenoCommand) {
236        match command {
237            DenoCommand::CreateSession {
238                session_id,
239                limits,
240                reply,
241            } => {
242                let result = self.create_session(session_id, limits);
243                let _ = reply.send(result);
244            }
245            DenoCommand::Run { request, reply } => {
246                let result = self.run(request).await;
247                let _ = reply.send(result);
248            }
249            DenoCommand::DestroySession { session_id, reply } => {
250                let _ = reply.send(self.sessions.remove(&session_id.0).is_some());
251            }
252            DenoCommand::ListSessions { reply } => {
253                let mut ids: Vec<_> = self.sessions.keys().cloned().map(SessionId).collect();
254                ids.sort_by(|a, b| a.0.cmp(&b.0));
255                let _ = reply.send(ids);
256            }
257            DenoCommand::SnapshotSession { session_id, reply } => {
258                let result = self.snapshot_session(&session_id);
259                let _ = reply.send(result);
260            }
261            DenoCommand::RestoreSession {
262                snapshot,
263                session_id,
264                reply,
265            } => {
266                let result = self.restore_session(&snapshot, session_id);
267                let _ = reply.send(result);
268            }
269        }
270    }
271
272    fn create_session(
273        &mut self,
274        session_id: SessionId,
275        limits: Option<SessionLimits>,
276    ) -> Result<(), ErrorObject> {
277        if !self.sessions.contains_key(&session_id.0) {
278            let limits = limits.unwrap_or_else(|| self.default_limits.clone());
279            let session = DenoSession::new(session_id.clone(), limits, &self.registry)?;
280            self.sessions.insert(session_id.0, session);
281        }
282        Ok(())
283    }
284
285    async fn run(&mut self, request: RunRequest) -> RunResult {
286        if request.language != Language::TypeScript {
287            return RunResult::error(
288                RunStatus::ValidationError,
289                ErrorObject::new(
290                    "UNSUPPORTED_FEATURE",
291                    "The Deno backend only executes TypeScript.",
292                ),
293                String::new(),
294                Metrics::default(),
295            );
296        }
297        if request.validate_only {
298            return validate_request(&request, &self.registry);
299        }
300
301        let limits = effective_limits(&self.default_limits, &request);
302        let session_id = request.session_id.clone();
303        let mut session = match self.sessions.remove(&session_id.0) {
304            Some(mut session) => {
305                session.limits = limits;
306                session
307            }
308            None => match DenoSession::new(session_id.clone(), limits, &self.registry) {
309                Ok(session) => session,
310                Err(error) => {
311                    return RunResult::error(
312                        RunStatus::RuntimeError,
313                        error,
314                        String::new(),
315                        Metrics::default(),
316                    );
317                }
318            },
319        };
320
321        let result = session.run(request, &self.registry).await;
322        self.sessions.insert(session_id.0, session);
323        result
324    }
325
326    fn snapshot_session(&mut self, session_id: &SessionId) -> Result<Vec<u8>, ErrorObject> {
327        let session = self.sessions.get_mut(&session_id.0).ok_or_else(|| {
328            ErrorObject::new(
329                "SESSION_NOT_FOUND",
330                format!("TypeScript session {} does not exist.", session_id.0),
331            )
332        })?;
333        let snapshot = SnapshotEnvelope {
334            magic: DENO_SNAPSHOT_MAGIC.to_owned(),
335            version: langshell_core::SNAPSHOT_VERSION,
336            session_id: session_id.0.clone(),
337            limits: session.limits.clone(),
338            globals: session.snapshot_globals()?,
339            capability_digest: capability_digest(&self.registry),
340        };
341        serde_json::to_vec(&snapshot).map_err(|err| {
342            ErrorObject::new(
343                "SNAPSHOT_CORRUPT",
344                format!("Failed to serialize Deno snapshot: {err}"),
345            )
346        })
347    }
348
349    fn restore_session(
350        &mut self,
351        snapshot: &[u8],
352        session_id: Option<SessionId>,
353    ) -> Result<SessionId, ErrorObject> {
354        let snapshot: SnapshotEnvelope = serde_json::from_slice(snapshot).map_err(|err| {
355            ErrorObject::new("SNAPSHOT_CORRUPT", format!("Invalid Deno snapshot: {err}"))
356        })?;
357        if snapshot.magic != DENO_SNAPSHOT_MAGIC {
358            return Err(ErrorObject::new(
359                "SNAPSHOT_CORRUPT",
360                "Deno snapshot magic mismatch.",
361            ));
362        }
363        if snapshot.version != langshell_core::SNAPSHOT_VERSION {
364            return Err(ErrorObject::new(
365                "SNAPSHOT_VERSION_MISMATCH",
366                format!("Snapshot version {} is not supported.", snapshot.version),
367            ));
368        }
369        if snapshot.capability_digest != capability_digest(&self.registry) {
370            return Err(ErrorObject::new(
371                "SNAPSHOT_CAPABILITY_MISMATCH",
372                "Snapshot was created with a different capability set.",
373            ));
374        }
375
376        let id = session_id.unwrap_or(SessionId(snapshot.session_id));
377        let mut session = DenoSession::new(id.clone(), snapshot.limits, &self.registry)?;
378        session.restore_globals(snapshot.globals)?;
379        self.sessions.insert(id.0.clone(), session);
380        Ok(id)
381    }
382}
383
384struct DenoSession {
385    id: SessionId,
386    limits: SessionLimits,
387    runtime: JsRuntime,
388}
389
390impl DenoSession {
391    fn new(
392        id: SessionId,
393        limits: SessionLimits,
394        registry: &ToolRegistry,
395    ) -> Result<Self, ErrorObject> {
396        let create_params = v8::Isolate::create_params().heap_limits(
397            0,
398            usize::try_from(limits.memory_mb).unwrap_or(usize::MAX / 1024 / 1024) * 1024 * 1024,
399        );
400        let mut runtime = JsRuntime::try_new(RuntimeOptions {
401            extensions: vec![langshell_extension(registry.clone(), limits.clone())],
402            create_params: Some(create_params),
403            ..Default::default()
404        })
405        .map_err(|err| ErrorObject::new("RUNTIME_ERROR", err.to_string()))?;
406
407        install_tool_globals(&mut runtime, registry)?;
408        Ok(Self {
409            id,
410            limits,
411            runtime,
412        })
413    }
414
415    async fn run(&mut self, request: RunRequest, registry: &ToolRegistry) -> RunResult {
416        let started = Instant::now();
417        if let Some(error) = static_validation_error(&request.code, registry) {
418            return RunResult::error(
419                code_to_status(&error.code, true),
420                error,
421                String::new(),
422                metrics(started, 0),
423            );
424        }
425
426        let js = match transpile_typescript(&request.code) {
427            Ok(js) => js,
428            Err(error) => {
429                return RunResult::error(
430                    code_to_status(&error.code, true),
431                    error,
432                    String::new(),
433                    metrics(started, 0),
434                );
435            }
436        };
437
438        self.reset_run_state(registry);
439        if let Err(error) = self.inject_inputs(&request.inputs) {
440            return RunResult::error(
441                RunStatus::ValidationError,
442                error,
443                String::new(),
444                metrics(started, 0),
445            );
446        }
447
448        let wrapped = wrap_user_code(&js);
449        let run_error = self
450            .execute_user_code(
451                &wrapped,
452                Duration::from_millis(u64::from(request.timeout_ms.unwrap_or(self.limits.wall_ms))),
453            )
454            .await
455            .err();
456        let state = self.run_state_snapshot();
457        if let Some(error) = run_error.or(state.error.clone()) {
458            let mut result = RunResult::error(
459                code_to_status(&error.code, false),
460                error,
461                truncate_stdout(state.stdout, &self.limits),
462                metrics(started, state.records.len() as u32),
463            );
464            result.stderr = truncate_stdout(state.stderr, &self.limits);
465            result.external_calls = state.records;
466            return result;
467        }
468
469        let result_value = match self.read_result() {
470            Ok(value) => value,
471            Err(error) => {
472                let mut result = RunResult::error(
473                    RunStatus::ValidationError,
474                    error,
475                    truncate_stdout(state.stdout, &self.limits),
476                    metrics(started, state.records.len() as u32),
477                );
478                result.stderr = truncate_stdout(state.stderr, &self.limits);
479                result.external_calls = state.records;
480                return result;
481            }
482        };
483
484        let mut result = RunResult::ok(
485            result_value,
486            truncate_stdout(state.stdout, &self.limits),
487            metrics(started, state.records.len() as u32),
488        );
489        result.stderr = truncate_stdout(state.stderr, &self.limits);
490        result.external_calls = state.records;
491        if request.return_snapshot {
492            result.snapshot_id = Some(format!("snap_{}", digest_bytes(self.id.0.as_bytes())));
493        }
494        result
495    }
496
497    fn reset_run_state(&mut self, registry: &ToolRegistry) {
498        let op_state = self.runtime.op_state();
499        let mut state = op_state.borrow_mut();
500        let data = state.borrow_mut::<DenoOpState>();
501        data.registry = registry.clone();
502        data.limits = self.limits.clone();
503        data.stdout.clear();
504        data.stderr.clear();
505        data.records.clear();
506        data.started_calls = 0;
507        data.error = None;
508    }
509
510    fn run_state_snapshot(&self) -> RunStateSnapshot {
511        let op_state = self.runtime.op_state();
512        let state = op_state.borrow();
513        let data = state.borrow::<DenoOpState>();
514        RunStateSnapshot {
515            stdout: data.stdout.clone(),
516            stderr: data.stderr.clone(),
517            records: data.records.clone(),
518            error: data.error.clone(),
519        }
520    }
521
522    fn inject_inputs(&mut self, inputs: &Map<String, Value>) -> Result<(), ErrorObject> {
523        if inputs.is_empty() {
524            return Ok(());
525        }
526        let inputs = serde_json::to_string(inputs).map_err(|err| {
527            ErrorObject::new("INVALID_ARGUMENT", format!("inputs are not JSON: {err}"))
528        })?;
529        execute_script_unit(
530            &mut self.runtime,
531            "<langshell-inputs>",
532            format!("globalThis.__langshell_restore_globals({inputs});"),
533        )
534    }
535
536    async fn execute_user_code(
537        &mut self,
538        code: &str,
539        timeout: Duration,
540    ) -> Result<(), ErrorObject> {
541        let timed_out = Arc::new(AtomicBool::new(false));
542        let timer_done = Arc::new((StdMutex::new(false), Condvar::new()));
543        let handle = self.runtime.v8_isolate().thread_safe_handle();
544        let timer_timed_out = timed_out.clone();
545        let timer_done_thread = timer_done.clone();
546        let timer = std::thread::spawn(move || {
547            let (lock, cvar) = &*timer_done_thread;
548            let done = lock.lock().expect("timer mutex poisoned");
549            let (done, wait) = cvar
550                .wait_timeout_while(done, timeout, |done| !*done)
551                .expect("timer condvar poisoned");
552            if !*done && wait.timed_out() {
553                timer_timed_out.store(true, Ordering::SeqCst);
554                handle.terminate_execution();
555            }
556        });
557
558        let value = match self
559            .runtime
560            .execute_script("<langshell-run>", code.to_owned())
561        {
562            Ok(value) => value,
563            Err(err) => {
564                stop_timer(timer_done, timer);
565                if timed_out.load(Ordering::SeqCst) {
566                    self.runtime.v8_isolate().cancel_terminate_execution();
567                    return Err(timeout_error());
568                }
569                return Err(error_from_js(err.to_string(), false));
570            }
571        };
572
573        let resolve = self.runtime.resolve(value);
574        let resolved = tokio::time::timeout(
575            timeout,
576            self.runtime
577                .with_event_loop_promise(resolve, PollEventLoopOptions::default()),
578        )
579        .await;
580        stop_timer(timer_done, timer);
581        if timed_out.load(Ordering::SeqCst) {
582            self.runtime.v8_isolate().cancel_terminate_execution();
583            return Err(timeout_error());
584        }
585        match resolved {
586            Ok(Ok(_)) => Ok(()),
587            Ok(Err(err)) => Err(error_from_js(err.to_string(), false)),
588            Err(_) => {
589                self.runtime.v8_isolate().terminate_execution();
590                self.runtime.v8_isolate().cancel_terminate_execution();
591                Err(timeout_error())
592            }
593        }
594    }
595
596    fn read_result(&mut self) -> Result<Option<Value>, ErrorObject> {
597        let value = self
598            .runtime
599            .execute_script("<langshell-result>", "globalThis.result")
600            .map_err(|err| error_from_js(err.to_string(), false))?;
601        v8_to_json(&mut self.runtime, value)
602    }
603
604    fn snapshot_globals(&mut self) -> Result<Value, ErrorObject> {
605        let value = self
606            .runtime
607            .execute_script(
608                "<langshell-snapshot>",
609                "JSON.stringify(globalThis.__langshell_snapshot_globals())",
610            )
611            .map_err(|err| ErrorObject::new("SNAPSHOT_CORRUPT", err.to_string()))?;
612        let globals = v8_to_string(&mut self.runtime, value)?;
613        serde_json::from_str(&globals).map_err(|err| {
614            ErrorObject::new(
615                "SNAPSHOT_CORRUPT",
616                format!("Deno snapshot globals did not produce valid JSON: {err}"),
617            )
618        })
619    }
620
621    fn restore_globals(&mut self, globals: Value) -> Result<(), ErrorObject> {
622        let globals = serde_json::to_string(&globals).map_err(|err| {
623            ErrorObject::new("SNAPSHOT_CORRUPT", format!("Invalid globals: {err}"))
624        })?;
625        execute_script_unit(
626            &mut self.runtime,
627            "<langshell-restore>",
628            format!("globalThis.__langshell_restore_globals({globals});"),
629        )
630        .map_err(|err| ErrorObject::new("SNAPSHOT_CORRUPT", err.message))
631    }
632}
633
634fn stop_timer(timer_done: Arc<(StdMutex<bool>, Condvar)>, timer: std::thread::JoinHandle<()>) {
635    let (lock, cvar) = &*timer_done;
636    if let Ok(mut done) = lock.lock() {
637        *done = true;
638        cvar.notify_one();
639    }
640    let _ = timer.join();
641}
642
643#[derive(Debug, Serialize, Deserialize)]
644struct SnapshotEnvelope {
645    magic: String,
646    version: u32,
647    session_id: String,
648    limits: SessionLimits,
649    globals: Value,
650    capability_digest: String,
651}
652
653struct DenoOpState {
654    registry: ToolRegistry,
655    limits: SessionLimits,
656    stdout: String,
657    stderr: String,
658    records: Vec<ExternalCallRecord>,
659    started_calls: u32,
660    error: Option<ErrorObject>,
661}
662
663impl DenoOpState {
664    fn new(registry: ToolRegistry, limits: SessionLimits) -> Self {
665        Self {
666            registry,
667            limits,
668            stdout: String::new(),
669            stderr: String::new(),
670            records: Vec::new(),
671            started_calls: 0,
672            error: None,
673        }
674    }
675}
676
677struct RunStateSnapshot {
678    stdout: String,
679    stderr: String,
680    records: Vec<ExternalCallRecord>,
681    error: Option<ErrorObject>,
682}
683
684#[op2(fast)]
685pub fn op_print(state: &mut OpState, #[string] msg: &str, is_err: bool) {
686    let data = state.borrow_mut::<DenoOpState>();
687    if is_err {
688        data.stderr.push_str(msg);
689    } else {
690        data.stdout.push_str(msg);
691    }
692}
693
694#[op2]
695#[serde]
696fn op_langshell_call_tool_sync(
697    state: &mut OpState,
698    #[string] name: String,
699    #[serde] args: Vec<serde_json::Value>,
700    #[serde] kwargs: serde_json::Map<String, serde_json::Value>,
701) -> Result<serde_json::Value, JsErrorBox> {
702    let (tool, ctx) = prepare_tool_call(state, name, args, kwargs)?;
703    if tool.async_mode {
704        let error = ErrorObject::new(
705            "TYPE_ERROR",
706            format!(
707                "Tool {} is asynchronous; call it with await.",
708                tool.capability.name
709            ),
710        );
711        state.borrow_mut::<DenoOpState>().error = Some(error.clone());
712        return Err(error_object_to_js(error));
713    }
714    let outcome = futures::executor::block_on(run_tool(tool, ctx));
715    finish_tool_call(state, outcome)
716}
717
718#[op2(async(lazy))]
719#[serde]
720async fn op_langshell_call_tool_async(
721    state: Rc<RefCell<OpState>>,
722    #[string] name: String,
723    #[serde] args: Vec<serde_json::Value>,
724    #[serde] kwargs: serde_json::Map<String, serde_json::Value>,
725) -> Result<serde_json::Value, JsErrorBox> {
726    let (tool, ctx) = {
727        let mut state = state.borrow_mut();
728        prepare_tool_call(&mut state, name, args, kwargs)?
729    };
730    let outcome = run_tool(tool, ctx).await;
731    let mut state = state.borrow_mut();
732    finish_tool_call(&mut state, outcome)
733}
734
735fn langshell_extension(registry: ToolRegistry, limits: SessionLimits) -> Extension {
736    Extension {
737        name: "langshell_deno",
738        ops: std::borrow::Cow::Owned(vec![
739            op_langshell_call_tool_sync(),
740            op_langshell_call_tool_async(),
741        ]),
742        middleware_fn: Some(Box::new(|op| match op.name {
743            "op_print" => op_print(),
744            _ => op,
745        })),
746        op_state_fn: Some(Box::new(move |state| {
747            state.put(DenoOpState::new(registry, limits));
748        })),
749        ..Default::default()
750    }
751}
752
753fn install_tool_globals(
754    runtime: &mut JsRuntime,
755    registry: &ToolRegistry,
756) -> Result<(), ErrorObject> {
757    let tools: Vec<_> = registry
758        .names()
759        .into_iter()
760        .filter_map(|name| {
761            registry.get(&name).map(|tool| {
762                json!({
763                    "name": name,
764                    "asyncMode": tool.async_mode,
765                })
766            })
767        })
768        .collect();
769    let tools = serde_json::to_string(&tools)
770        .map_err(|err| ErrorObject::new("SERIALIZE_ERROR", format!("tool metadata: {err}")))?;
771    let script = format!(
772        r#"
773const __langshellToolDefs = {tools};
774const __langshellOps = Deno.core.ops;
775Object.defineProperty(globalThis, "__langshell_ops", {{ value: __langshellOps, configurable: false }});
776Object.defineProperty(globalThis, "LangShell", {{
777  value: Object.freeze({{
778    callTool: (name, args = [], kwargs = {{}}) => __langshellOps.op_langshell_call_tool_async(name, args, kwargs),
779    callToolSync: (name, args = [], kwargs = {{}}) => __langshellOps.op_langshell_call_tool_sync(name, args, kwargs),
780  }}),
781  configurable: true,
782}});
783for (const tool of __langshellToolDefs) {{
784  const call = tool.asyncMode
785    ? (...args) => __langshellOps.op_langshell_call_tool_async(tool.name, args, {{}})
786    : (...args) => __langshellOps.op_langshell_call_tool_sync(tool.name, args, {{}});
787  Object.defineProperty(globalThis, tool.name, {{ value: call, writable: false, configurable: true }});
788}}
789Object.defineProperty(globalThis, "__langshell_restore_globals", {{
790  value: (globals) => {{
791    for (const [key, value] of Object.entries(globals ?? {{}})) {{
792      Object.defineProperty(globalThis, key, {{ value, writable: true, configurable: true }});
793    }}
794  }},
795  configurable: false,
796}});
797Object.defineProperty(globalThis, "__langshell_snapshot_globals", {{
798  value: () => {{
799    const out = {{}};
800    for (const key of Object.getOwnPropertyNames(globalThis)) {{
801      if (globalThis.__langshell_baseline.has(key) || key.startsWith("__langshell")) continue;
802      const value = globalThis[key];
803      if (typeof value === "function" || typeof value === "symbol" || typeof value === "undefined" || typeof value === "bigint") continue;
804      try {{
805        JSON.stringify(value);
806        out[key] = value;
807      }} catch (_) {{}}
808    }}
809    return out;
810  }},
811  configurable: false,
812}});
813Object.defineProperty(globalThis, "__langshell_baseline", {{
814  value: new Set(Object.getOwnPropertyNames(globalThis)),
815  configurable: false,
816}});
817try {{
818  Object.defineProperty(globalThis, "Deno", {{ value: undefined, writable: false, configurable: true }});
819}} catch (_) {{}}
820"#
821    );
822    execute_script_unit(runtime, "<langshell-bootstrap>", script)
823}
824
825fn prepare_tool_call(
826    state: &mut OpState,
827    name: String,
828    args: Vec<Value>,
829    kwargs: Map<String, Value>,
830) -> Result<(langshell_core::RegisteredTool, ToolCallContext), JsErrorBox> {
831    let data = state.borrow_mut::<DenoOpState>();
832    data.started_calls = data.started_calls.saturating_add(1);
833    if data.started_calls > data.limits.max_external_calls {
834        let error = ErrorObject::new(
835            "EXTERNAL_CALLS_EXCEEDED",
836            format!(
837                "External call limit {} exceeded.",
838                data.limits.max_external_calls
839            ),
840        );
841        data.error = Some(error.clone());
842        return Err(error_object_to_js(error));
843    }
844    let Some(tool) = data.registry.get(&name).cloned() else {
845        let error = ErrorObject::new(
846            "UNKNOWN_TOOL",
847            format!("Function {name} is not registered."),
848        )
849        .with_hint("Call list_tools() or describe_tool() to inspect registered functions.");
850        data.error = Some(error.clone());
851        return Err(error_object_to_js(error));
852    };
853    Ok((tool, ToolCallContext { name, args, kwargs }))
854}
855
856async fn run_tool(
857    tool: langshell_core::RegisteredTool,
858    ctx: ToolCallContext,
859) -> (Result<Value, ErrorObject>, ExternalCallRecord) {
860    let started = Instant::now();
861    let request_digest = digest_json(&json!({"args": ctx.args, "kwargs": ctx.kwargs}));
862    let side_effect = tool.capability.side_effect;
863    let name = tool.capability.name.clone();
864    match tool.call(ctx).await {
865        Ok(value) => {
866            let response_digest = Some(digest_json(&value));
867            (
868                Ok(value),
869                ExternalCallRecord {
870                    name,
871                    side_effect,
872                    duration_ms: elapsed_ms(started),
873                    status: CallStatus::Ok,
874                    request_digest,
875                    response_digest,
876                    error: None,
877                },
878            )
879        }
880        Err(error) => {
881            let error_object = ErrorObject::new(error.code, error.message);
882            (
883                Err(error_object.clone()),
884                ExternalCallRecord {
885                    name,
886                    side_effect,
887                    duration_ms: elapsed_ms(started),
888                    status: CallStatus::Error,
889                    request_digest,
890                    response_digest: None,
891                    error: Some(error_object),
892                },
893            )
894        }
895    }
896}
897
898fn finish_tool_call(
899    state: &mut OpState,
900    outcome: (Result<Value, ErrorObject>, ExternalCallRecord),
901) -> Result<Value, JsErrorBox> {
902    let (result, record) = outcome;
903    let data = state.borrow_mut::<DenoOpState>();
904    data.records.push(record);
905    match result {
906        Ok(value) => Ok(value),
907        Err(error) => {
908            data.error = Some(error.clone());
909            Err(error_object_to_js(error))
910        }
911    }
912}
913
914fn validate_request(request: &RunRequest, registry: &ToolRegistry) -> RunResult {
915    let started = Instant::now();
916    if let Some(error) = static_validation_error(&request.code, registry) {
917        return RunResult::error(
918            code_to_status(&error.code, true),
919            error,
920            String::new(),
921            metrics(started, 0),
922        );
923    }
924    match transpile_typescript(&request.code) {
925        Ok(_) => RunResult::ok(None, String::new(), metrics(started, 0)),
926        Err(error) => RunResult::error(
927            code_to_status(&error.code, true),
928            error,
929            String::new(),
930            metrics(started, 0),
931        ),
932    }
933}
934
935fn transpile_typescript(code: &str) -> Result<String, ErrorObject> {
936    let specifier = deno_core::resolve_url("file:///langshell-run.ts")
937        .map_err(|err| ErrorObject::new("RUNTIME_ERROR", err.to_string()))?;
938    let parsed = deno_ast::parse_module(ParseParams {
939        specifier,
940        text: code.to_owned().into(),
941        media_type: MediaType::TypeScript,
942        capture_tokens: false,
943        scope_analysis: false,
944        maybe_syntax: None,
945    })
946    .map_err(|err| ErrorObject::new("SYNTAX_ERROR", err.to_string()))?;
947    let transpiled = parsed
948        .transpile(
949            &deno_ast::TranspileOptions {
950                imports_not_used_as_values: deno_ast::ImportsNotUsedAsValues::Remove,
951                decorators: deno_ast::DecoratorsTranspileOption::Ecma,
952                ..Default::default()
953            },
954            &deno_ast::TranspileModuleOptions { module_kind: None },
955            &deno_ast::EmitOptions {
956                source_map: SourceMapOption::None,
957                inline_sources: false,
958                ..Default::default()
959            },
960        )
961        .map_err(|err| ErrorObject::new("SYNTAX_ERROR", err.to_string()))?;
962    Ok(transpiled.into_source().text)
963}
964
965fn wrap_user_code(js: &str) -> String {
966    format!(
967        r#"
968(async () => {{
969  with (globalThis) {{
970{js}
971  }}
972}})()
973"#
974    )
975}
976
977fn v8_to_json(
978    runtime: &mut JsRuntime,
979    value: v8::Global<v8::Value>,
980) -> Result<Option<Value>, ErrorObject> {
981    deno_core::scope!(scope, runtime);
982    let local = v8::Local::new(scope, value);
983    if local.is_undefined() {
984        return Ok(None);
985    }
986    serde_v8::from_v8::<Value>(scope, local)
987        .map(Some)
988        .map_err(|err| {
989            ErrorObject::new(
990                "RESULT_NOT_SERIALIZABLE",
991                format!("TypeScript result could not be converted to JSON: {err}"),
992            )
993            .with_hint("Assign result to a JSON-compatible value before returning.")
994        })
995}
996
997fn v8_to_string(
998    runtime: &mut JsRuntime,
999    value: v8::Global<v8::Value>,
1000) -> Result<String, ErrorObject> {
1001    deno_core::scope!(scope, runtime);
1002    let local = v8::Local::new(scope, value);
1003    serde_v8::from_v8::<String>(scope, local).map_err(|err| {
1004        ErrorObject::new(
1005            "RESULT_NOT_SERIALIZABLE",
1006            format!("TypeScript value could not be converted to string: {err}"),
1007        )
1008    })
1009}
1010
1011fn execute_script_unit(
1012    runtime: &mut JsRuntime,
1013    name: &'static str,
1014    source: String,
1015) -> Result<(), ErrorObject> {
1016    runtime
1017        .execute_script(name, source)
1018        .map(|_| ())
1019        .map_err(|err| error_from_js(err.to_string(), false))
1020}
1021
1022fn static_validation_error(code: &str, registry: &ToolRegistry) -> Option<ErrorObject> {
1023    let unsupported = [
1024        "import ",
1025        "export ",
1026        "import(",
1027        "require(",
1028        "Deno.",
1029        "Deno[",
1030        "fetch(",
1031        "XMLHttpRequest",
1032        "WebSocket",
1033        "process.",
1034        "Bun.",
1035        "eval(",
1036        "Function(",
1037    ];
1038    unsupported
1039        .iter()
1040        .find(|pattern| code.contains(**pattern))
1041        .map(|pattern| {
1042            ErrorObject::new(
1043                "UNSUPPORTED_FEATURE",
1044                format!("Use of {pattern:?} is not supported in the LangShell Deno sandbox."),
1045            )
1046            .with_hint(
1047                "Use a registered capability such as read_text, fetch_json, or list_tools instead.",
1048            )
1049        })
1050        .or_else(|| {
1051            let suspicious = ["fetch_url", "query_db", "send_email"];
1052            suspicious
1053                .iter()
1054                .find(|name| code.contains(&format!("{name}(")) && !registry.contains(name))
1055                .map(|name| {
1056                    ErrorObject::new(
1057                        "UNKNOWN_TOOL",
1058                        format!("Function {name} is not registered."),
1059                    )
1060                    .with_hint("Call list_tools() to inspect available capabilities.")
1061                })
1062        })
1063}
1064
1065fn effective_limits(default_limits: &SessionLimits, request: &RunRequest) -> SessionLimits {
1066    let mut limits = request
1067        .limits
1068        .clone()
1069        .unwrap_or_else(|| default_limits.clone());
1070    if let Some(timeout_ms) = request.timeout_ms {
1071        limits.wall_ms = timeout_ms;
1072    }
1073    limits
1074}
1075
1076fn code_to_status(code: &str, validation: bool) -> RunStatus {
1077    match code {
1078        "PERMISSION_DENIED" => RunStatus::PermissionDenied,
1079        "WAITING_FOR_APPROVAL" => RunStatus::WaitingForApproval,
1080        "TIMEOUT_WALL" | "TIMEOUT_CPU" | "TIMEOUT_TOOL" => RunStatus::Timeout,
1081        "CANCELLED" => RunStatus::Cancelled,
1082        "MEMORY_EXCEEDED" | "STDOUT_EXCEEDED" | "EXTERNAL_CALLS_EXCEEDED" | "STACK_OVERFLOW" => {
1083            RunStatus::ResourceExhausted
1084        }
1085        "SYNTAX_ERROR"
1086        | "TYPE_ERROR"
1087        | "UNKNOWN_TOOL"
1088        | "UNSUPPORTED_FEATURE"
1089        | "RESULT_NOT_SERIALIZABLE"
1090        | "SNAPSHOT_VERSION_MISMATCH"
1091        | "SNAPSHOT_CAPABILITY_MISMATCH"
1092        | "SNAPSHOT_CORRUPT" => RunStatus::ValidationError,
1093        _ if validation => RunStatus::ValidationError,
1094        _ => RunStatus::RuntimeError,
1095    }
1096}
1097
1098fn error_from_js(message: String, validation: bool) -> ErrorObject {
1099    let code = if validation || message.contains("SyntaxError") {
1100        "SYNTAX_ERROR"
1101    } else if message.contains("execution terminated") {
1102        "TIMEOUT_WALL"
1103    } else {
1104        "RUNTIME_ERROR"
1105    };
1106    ErrorObject::new(code, message)
1107}
1108
1109fn error_object_to_js(error: ErrorObject) -> JsErrorBox {
1110    JsErrorBox::generic(format!("{}: {}", error.code, error.message))
1111}
1112
1113fn timeout_error() -> ErrorObject {
1114    ErrorObject::new(
1115        "TIMEOUT_WALL",
1116        "TypeScript execution exceeded the wall-clock limit.",
1117    )
1118}
1119
1120fn runtime_error_result(error: ErrorObject) -> RunResult {
1121    RunResult::error(
1122        RunStatus::RuntimeError,
1123        error,
1124        String::new(),
1125        Metrics::default(),
1126    )
1127}
1128
1129fn worker_closed_error() -> ErrorObject {
1130    ErrorObject::new("RUNTIME_ERROR", "LangShell Deno worker is not available.")
1131}
1132
1133fn truncate_stdout(mut stdout: String, limits: &SessionLimits) -> String {
1134    let max = limits.max_stdout_bytes as usize;
1135    if stdout.len() > max {
1136        stdout.truncate(max);
1137    }
1138    stdout
1139}
1140
1141fn metrics(started: Instant, external_calls_count: u32) -> Metrics {
1142    Metrics {
1143        duration_ms: elapsed_ms(started),
1144        memory_peak_bytes: 0,
1145        instructions: 0,
1146        external_calls_count,
1147    }
1148}
1149
1150fn elapsed_ms(started: Instant) -> u32 {
1151    u32::try_from(started.elapsed().as_millis()).unwrap_or(u32::MAX)
1152}
1153
1154fn capability_digest(registry: &ToolRegistry) -> String {
1155    digest_json(&json!(registry.names()))
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161    use langshell_core::{Capability, RegisteredTool, SideEffect};
1162
1163    #[tokio::test]
1164    async fn runs_typescript_and_reuses_state() {
1165        let runtime = DenoRuntime::new(ToolRegistry::new(), SessionLimits::default());
1166        let mut first = RunRequest::new("s1", "cache = { k: 1 }").unwrap();
1167        first.language = Language::TypeScript;
1168        assert_eq!(runtime.run(first).await.status, RunStatus::Ok);
1169
1170        let mut second = RunRequest::new("s1", "result = cache.k + 1").unwrap();
1171        second.language = Language::TypeScript;
1172        let result = runtime.run(second).await;
1173        assert_eq!(result.status, RunStatus::Ok, "{result:?}");
1174        assert_eq!(result.result, Some(json!(2)));
1175    }
1176
1177    #[tokio::test]
1178    async fn supports_async_external_function() {
1179        let mut registry = ToolRegistry::new();
1180        registry
1181            .register(RegisteredTool::asynchronous(
1182                Capability::new("fetch_json", "test fetch", SideEffect::Network),
1183                |ctx| {
1184                    Box::pin(async move {
1185                        Ok(json!({
1186                            "url": ctx.args.first().and_then(Value::as_str).unwrap_or_default(),
1187                        }))
1188                    })
1189                },
1190            ))
1191            .unwrap();
1192        let runtime = DenoRuntime::new(registry, SessionLimits::default());
1193        let mut request = RunRequest::new(
1194            "s1",
1195            r#"
1196data = await fetch_json("https://api.example.com/item")
1197result = { url: data.url }
1198"#,
1199        )
1200        .unwrap();
1201        request.language = Language::TypeScript;
1202        let result = runtime.run(request).await;
1203        assert_eq!(result.status, RunStatus::Ok, "{result:?}");
1204        assert_eq!(
1205            result.result,
1206            Some(json!({"url": "https://api.example.com/item"}))
1207        );
1208        assert_eq!(result.metrics.external_calls_count, 1);
1209    }
1210
1211    #[tokio::test]
1212    async fn snapshots_json_globals() {
1213        let runtime = DenoRuntime::new(ToolRegistry::new(), SessionLimits::default());
1214        let mut first = RunRequest::new("s1", "state = { step: 1 }").unwrap();
1215        first.language = Language::TypeScript;
1216        assert_eq!(runtime.run(first).await.status, RunStatus::Ok);
1217
1218        let snapshot = runtime
1219            .snapshot_session(&SessionId("s1".to_owned()))
1220            .await
1221            .unwrap();
1222        runtime
1223            .restore_session(&snapshot, Some(SessionId("s2".to_owned())))
1224            .await
1225            .unwrap();
1226
1227        let mut second = RunRequest::new("s2", "result = state.step").unwrap();
1228        second.language = Language::TypeScript;
1229        let result = runtime.run(second).await;
1230        assert_eq!(result.status, RunStatus::Ok, "{result:?}");
1231        assert_eq!(result.result, Some(json!(1)));
1232    }
1233}