Skip to main content

algocline_engine/
executor.rs

1//! Lua execution engine.
2//!
3//! Orchestrates StdLib injection and Lua execution for each session:
4//!
5//! 1. **Layer 0** — [`bridge::register`] injects Rust-backed `alc.*` primitives
6//! 2. **Layer 1** — [`PRELUDE`] adds Lua-based combinators (`alc.map`, etc.)
7//! 3. **Layer 2** — [`mlua_pkg::Registry`] makes `require("ucb")` etc.
8//!    resolve from `~/.algocline/packages/`
9//!
10//! ## Execution models
11//!
12//! - **`eval_simple`** — sync eval on a shared VM (no LLM bridge).
13//!   For lightweight ops like reading package metadata.
14//! - **`start_session`** — spawns a **dedicated VM per session**.
15//!   Each session gets an isolated Lua VM so concurrent sessions
16//!   cannot interfere with each other's globals (`alc`, `ctx`).
17//!   `alc.llm()` yields the coroutine, and the VM is cleaned up
18//!   when the session completes or is abandoned.
19
20use std::path::PathBuf;
21use std::sync::Arc;
22
23use algocline_core::{Budget, ExecutionMetrics, ExecutionSpec};
24use mlua::LuaSerdeExt;
25use mlua_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
26use mlua_pkg::Registry;
27
28use crate::bridge;
29use crate::card::FileCardStore;
30use crate::llm_bridge::LlmRequest;
31use crate::resolver_factory::make_resolver;
32use crate::session::Session;
33use crate::state::JsonFileStore;
34use crate::variant_pkg::{register_variant_pkgs, VariantPkg};
35
36/// Layer 1: Prelude combinators (map, reduce, vote, filter).
37/// Embedded at compile time and loaded into every session.
38const PRELUDE: &str = include_str!("prelude.lua");
39
40/// Lua execution engine.
41///
42/// Holds a **shared VM** for lightweight stateless operations (`eval_simple`)
43/// and spawns **per-session VMs** for coroutine-based execution (`start_session`).
44///
45/// Per-session VMs eliminate global namespace pollution between concurrent
46/// sessions — each session's `alc`, `ctx`, and `package.loaded` are fully
47/// isolated.
48pub struct Executor {
49    /// Shared VM for eval_simple (stateless, no session globals).
50    isle: AsyncIsle,
51    _driver: AsyncIsleDriver,
52    /// Package resolver paths, cloned into each per-session VM.
53    lib_paths: Vec<PathBuf>,
54}
55
56impl Executor {
57    pub async fn new(lib_paths: Vec<PathBuf>) -> anyhow::Result<Self> {
58        let paths_for_shared = lib_paths.clone();
59        let (isle, driver) = AsyncIsle::spawn(move |lua| {
60            let mut reg = Registry::new();
61            for path in &paths_for_shared {
62                if let Some(resolver) = make_resolver(path) {
63                    reg.add(resolver);
64                }
65            }
66            reg.install(lua)?;
67            Ok(())
68        })
69        .await?;
70
71        Ok(Self {
72            isle,
73            _driver: driver,
74            lib_paths,
75        })
76    }
77
78    /// Evaluate Lua code without LLM bridge. For lightweight operations
79    /// like reading package metadata.
80    ///
81    /// Uses the shared VM. `extra_lib_paths` must be empty — use
82    /// [`Self::eval_simple_with_paths`] when project-local paths are needed.
83    pub async fn eval_simple(&self, code: String) -> Result<serde_json::Value, String> {
84        self.eval_simple_with_paths(code, vec![], vec![]).await
85    }
86
87    /// Evaluate Lua code without LLM bridge, with optional extra package paths
88    /// and variant pkgs.
89    ///
90    /// When both `extra_lib_paths` and `variant_pkgs` are empty, reuses the
91    /// shared VM (cheap). When either is non-empty, spawns a dedicated VM so
92    /// the extra resolvers are active (slightly more expensive, but `pkg_list`
93    /// is the only caller and it is low-frequency).
94    ///
95    /// The fast path does not register `alc.*` bridge primitives, so the
96    /// `state_store` / `card_store` / `scenarios_dir` handles that
97    /// [`Self::start_session`] requires are not threaded through here —
98    /// callers that need them go through `start_session`.
99    pub async fn eval_simple_with_paths(
100        &self,
101        code: String,
102        extra_lib_paths: Vec<PathBuf>,
103        variant_pkgs: Vec<VariantPkg>,
104    ) -> Result<serde_json::Value, String> {
105        if extra_lib_paths.is_empty() && variant_pkgs.is_empty() {
106            // Fast path: reuse the long-lived shared VM.
107            let task = self.isle.spawn_exec(move |lua| {
108                let result: mlua::Value = lua
109                    .load(&code)
110                    .eval()
111                    .map_err(|e| IsleError::Lua(e.to_string()))?;
112                let json: serde_json::Value = lua
113                    .from_value(result)
114                    .map_err(|e| IsleError::Lua(e.to_string()))?;
115                serde_json::to_string(&json)
116                    .map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
117            });
118            let json_str = task.await.map_err(|e| e.to_string())?;
119            return serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"));
120        }
121
122        // Slow path: spawn a dedicated VM with extra resolvers prepended.
123        let mut effective = extra_lib_paths;
124        effective.extend(self.lib_paths.iter().cloned());
125
126        let (tmp_isle, _tmp_driver) = AsyncIsle::spawn(move |lua| {
127            let mut reg = Registry::new();
128            // Variant pkgs first so alc.local.toml overrides win over global.
129            register_variant_pkgs(&mut reg, &variant_pkgs);
130            for path in &effective {
131                if let Some(resolver) = make_resolver(path) {
132                    reg.add(resolver);
133                }
134            }
135            reg.install(lua)?;
136            Ok(())
137        })
138        .await
139        .map_err(|e| format!("eval_simple VM spawn failed: {e}"))?;
140
141        let task = tmp_isle.spawn_exec(move |lua| {
142            let result: mlua::Value = lua
143                .load(&code)
144                .eval()
145                .map_err(|e| IsleError::Lua(e.to_string()))?;
146            let json: serde_json::Value = lua
147                .from_value(result)
148                .map_err(|e| IsleError::Lua(e.to_string()))?;
149            serde_json::to_string(&json).map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
150        });
151
152        let json_str = task.await.map_err(|e| e.to_string())?;
153        serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"))
154    }
155
156    /// Start a new Lua execution session on a **dedicated VM**.
157    ///
158    /// Each session gets its own Lua VM (OS thread + mlua instance) so
159    /// concurrent sessions cannot interfere with each other's globals.
160    /// The VM is cleaned up automatically when the session completes or
161    /// is abandoned (all senders drop → channel closes → thread exits).
162    ///
163    /// `extra_lib_paths` are prepended to `self.lib_paths` so project-local
164    /// packages take precedence over the global package directory.
165    /// `variant_pkgs` come from `alc.local.toml` and override both layers
166    /// (registered at the highest priority).
167    ///
168    /// `state_store` / `card_store` / `scenarios_dir` are resolved by the
169    /// service layer (typically from `AppConfig.app_dir()`) so the engine
170    /// crate never touches HOME. They flow through [`bridge::BridgeConfig`]
171    /// to back `alc.state.*` / `alc.card.*` / `alc._dirs.scenarios`.
172    #[allow(clippy::too_many_arguments)]
173    pub async fn start_session(
174        &self,
175        code: String,
176        ctx: serde_json::Value,
177        extra_lib_paths: Vec<PathBuf>,
178        variant_pkgs: Vec<VariantPkg>,
179        state_store: Arc<JsonFileStore>,
180        card_store: Arc<FileCardStore>,
181        scenarios_dir: PathBuf,
182    ) -> Result<Session, String> {
183        let spec = ExecutionSpec::new(code, ctx);
184        let metrics = ExecutionMetrics::new();
185
186        // Extract and apply budget from ctx.budget
187        if let Some(budget) = Budget::from_ctx(&spec.ctx) {
188            metrics.set_budget(budget);
189        }
190
191        let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
192
193        // Build effective lib_paths: extra (project-local) first, then defaults.
194        // Priority: variant_pkgs > extra_lib_paths > self.lib_paths
195        // (ALC_PACKAGES_PATH + global default).
196        let mut effective = extra_lib_paths;
197        effective.extend(self.lib_paths.iter().cloned());
198
199        let bridge_config = bridge::BridgeConfig {
200            llm_tx: Some(llm_tx),
201            ns: spec.namespace.clone(),
202            custom_metrics: metrics.custom_metrics_handle(),
203            budget: metrics.budget_handle(),
204            progress: metrics.progress_handle(),
205            lib_paths: effective.clone(), // fork child VMs inherit project paths
206            variant_pkgs: variant_pkgs.clone(), // fork child VMs inherit variant overrides
207            state_store,
208            card_store,
209            scenarios_dir,
210        };
211        let lua_ctx = spec.ctx.clone();
212        let lua_code = spec.code.clone();
213
214        // 1. Spawn a dedicated VM for this session.
215        let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
216            let mut reg = Registry::new();
217            // Variant pkgs first so alc.local.toml overrides win over global.
218            register_variant_pkgs(&mut reg, &variant_pkgs);
219            for path in &effective {
220                if let Some(resolver) = make_resolver(path) {
221                    reg.add(resolver);
222                }
223            }
224            reg.install(lua)?;
225            Ok(())
226        })
227        .await
228        .map_err(|e| format!("Session VM spawn failed: {e}"))?;
229
230        // 2. Setup: register alc.* StdLib, set ctx, load prelude.
231        //    Safe to set globals — this VM is exclusively ours.
232        session_isle
233            .exec(move |lua| {
234                let alc_table = lua.create_table()?;
235                bridge::register(lua, &alc_table, bridge_config)?;
236                lua.globals().set("alc", alc_table)?;
237
238                let ctx_value = lua.to_value(&lua_ctx)?;
239                lua.globals().set("ctx", ctx_value)?;
240
241                lua.load(PRELUDE)
242                    .exec()
243                    .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
244
245                // Override Lua's standard `print` to redirect output to tracing
246                // instead of stdout.  Stdout is used exclusively by the rmcp
247                // JSON-RPC stdio transport; any stray write from user Lua code
248                // would corrupt the transport framing.
249                //
250                // Behaviour mirrors the standard `print`:
251                //   * multiple args are joined with "\t"
252                //   * each arg is coerced via `tostring`
253                //   * the assembled line is emitted as `tracing::info!` so it
254                //     appears in the log file without reaching stdout
255                //
256                // `io.write` is intentionally left unchanged — scripts that
257                // explicitly target stdout/stderr can still use it.
258                let print_fn = lua.create_function(|lua_inner, args: mlua::MultiValue| {
259                    use mlua::prelude::LuaValue;
260                    let parts: Vec<String> = args
261                        .iter()
262                        .map(|v| match v {
263                            LuaValue::Nil => "nil".to_string(),
264                            LuaValue::Boolean(b) => b.to_string(),
265                            LuaValue::Integer(n) => n.to_string(),
266                            LuaValue::Number(n) => {
267                                // Reproduce Lua's default float formatting: no
268                                // trailing zeros for whole-number values.
269                                if n.fract() == 0.0 && n.abs() < 1e15_f64 {
270                                    format!("{n:.1}")
271                                } else {
272                                    format!("{n}")
273                                }
274                            }
275                            other => lua_inner
276                                .coerce_string(other.clone())
277                                .ok()
278                                .flatten()
279                                .and_then(|s| s.to_str().ok().map(|r| r.to_string()))
280                                .unwrap_or_else(|| format!("{other:?}")),
281                        })
282                        .collect();
283                    let line = parts.join("\t");
284                    tracing::info!(target: "alc.lua.print", "{}", line);
285                    Ok(())
286                })?;
287                lua.globals().set("print", print_fn)?;
288
289                // No need to clear package.loaded — fresh VM.
290
291                Ok("ok".to_string())
292            })
293            .await
294            .map_err(|e| format!("Session setup failed: {e}"))?;
295
296        // 3. Execute user code as a coroutine on the session VM.
297        let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
298        let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
299
300        // Handle no longer needed — all requests have been sent.
301        // The driver keeps the channel alive until the session completes.
302        drop(session_isle);
303
304        Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use std::fs;
312
313    /// Create a temporary package directory with the given name and `init.lua` content.
314    fn make_pkg_dir(parent: &std::path::Path, pkg_name: &str, init_lua: &str) -> PathBuf {
315        let pkg_dir = parent.join(pkg_name);
316        fs::create_dir_all(&pkg_dir).unwrap();
317        fs::write(pkg_dir.join("init.lua"), init_lua).unwrap();
318        parent.to_path_buf()
319    }
320
321    /// `extra_lib_paths=vec![]` — eval_simple must work as before.
322    #[tokio::test]
323    async fn no_extra_lib_paths_eval_simple() {
324        let executor = Executor::new(vec![]).await.unwrap();
325        let result = executor.eval_simple("return 42".to_string()).await.unwrap();
326        assert_eq!(result, serde_json::json!(42));
327    }
328
329    /// `eval_simple_with_paths` with a project-local package.
330    ///
331    /// Creates a temp dir with `test_pkg/init.lua` returning `{value = 99}`,
332    /// then verifies `require("test_pkg").value` == 99 via the extra resolver.
333    #[tokio::test]
334    async fn extra_lib_paths_reachable_via_eval_simple_with_paths() {
335        let tmp = tempfile::tempdir().unwrap();
336        let pkg_root = make_pkg_dir(tmp.path(), "test_pkg", "return { value = 99 }");
337
338        let executor = Executor::new(vec![]).await.unwrap();
339        let code = r#"
340            local pkg = require("test_pkg")
341            return pkg.value
342        "#
343        .to_string();
344
345        let result = executor
346            .eval_simple_with_paths(code, vec![pkg_root], vec![])
347            .await
348            .unwrap();
349
350        assert_eq!(result, serde_json::json!(99));
351    }
352
353    /// Variant pkg with a non-matching directory name resolves via
354    /// `VariantRootResolver` + `PrefixResolver`.
355    #[tokio::test]
356    async fn variant_pkg_resolves_root_and_submodule() {
357        let tmp = tempfile::tempdir().unwrap();
358        // pkg dir name (`physical-dir`) intentionally differs from the
359        // require name (`logical_name`) — variant scope must support this.
360        let pkg_dir = tmp.path().join("physical-dir");
361        fs::create_dir_all(&pkg_dir).unwrap();
362        fs::write(
363            pkg_dir.join("init.lua"),
364            "return { greet = function(n) return 'hi-' .. n end, sub = require('logical_name.sub') }",
365        )
366        .unwrap();
367        fs::write(pkg_dir.join("sub.lua"), "return { value = 7 }").unwrap();
368
369        let executor = Executor::new(vec![]).await.unwrap();
370        let code = r#"
371            local pkg = require("logical_name")
372            return { msg = pkg.greet("there"), sub_value = pkg.sub.value }
373        "#
374        .to_string();
375
376        let result = executor
377            .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("logical_name", pkg_dir)])
378            .await
379            .unwrap();
380
381        assert_eq!(result["msg"], serde_json::json!("hi-there"));
382        assert_eq!(result["sub_value"], serde_json::json!(7));
383    }
384
385    /// Variant pkg overrides a same-name global pkg (priority: variant > global).
386    #[tokio::test]
387    async fn variant_pkg_overrides_global_same_name() {
388        let global_tmp = tempfile::tempdir().unwrap();
389        let variant_tmp = tempfile::tempdir().unwrap();
390
391        // Global: my_pkg returns 1
392        make_pkg_dir(global_tmp.path(), "my_pkg", "return { value = 1 }");
393        // Variant: my_pkg returns 2 — must win
394        let variant_dir = variant_tmp.path().join("my_pkg");
395        fs::create_dir_all(&variant_dir).unwrap();
396        fs::write(variant_dir.join("init.lua"), "return { value = 2 }").unwrap();
397
398        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
399            .await
400            .unwrap();
401
402        let code = r#"
403            local pkg = require("my_pkg")
404            return pkg.value
405        "#
406        .to_string();
407
408        let result = executor
409            .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("my_pkg", variant_dir)])
410            .await
411            .unwrap();
412
413        assert_eq!(result, serde_json::json!(2));
414    }
415
416    /// When `extra_lib_paths` has a pkg with the same name as one in global paths,
417    /// the extra one takes priority (it is prepended).
418    #[tokio::test]
419    async fn extra_lib_paths_priority_over_default() {
420        let global_tmp = tempfile::tempdir().unwrap();
421        let extra_tmp = tempfile::tempdir().unwrap();
422
423        // Global: test_pkg returns 1
424        make_pkg_dir(global_tmp.path(), "test_pkg", "return { value = 1 }");
425        // Extra (project-local): test_pkg returns 2
426        let extra_root = make_pkg_dir(extra_tmp.path(), "test_pkg", "return { value = 2 }");
427
428        // Executor has global as its lib_paths.
429        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
430            .await
431            .unwrap();
432
433        let code = r#"
434            local pkg = require("test_pkg")
435            return pkg.value
436        "#
437        .to_string();
438
439        let result = executor
440            .eval_simple_with_paths(code, vec![extra_root], vec![])
441            .await
442            .unwrap();
443
444        // extra (2) must win over global (1)
445        assert_eq!(result, serde_json::json!(2));
446    }
447}