Skip to main content

algocline_engine/
executor.rs

1//! Lua execution engine.
2//!
3//! Orchestrates StdLib injection and Lua execution for each session:
4//!
5//! 1. **Layer 0** — [`bridge::register`] injects Rust-backed `alc.*` primitives
6//! 2. **Layer 1** — [`PRELUDE`] adds Lua-based combinators (`alc.map`, etc.)
7//! 3. **Layer 2** — [`mlua_pkg::Registry`] makes `require("ucb")` etc.
8//!    resolve from `~/.algocline/packages/`
9//!
10//! ## Execution models
11//!
12//! - **`eval_simple`** — sync eval on a shared VM (no LLM bridge).
13//!   For lightweight ops like reading package metadata.
14//! - **`start_session`** — spawns a **dedicated VM per session**.
15//!   Each session gets an isolated Lua VM so concurrent sessions
16//!   cannot interfere with each other's globals (`alc`, `ctx`).
17//!   `alc.llm()` yields the coroutine, and the VM is cleaned up
18//!   when the session completes or is abandoned.
19
20use std::path::PathBuf;
21
22use algocline_core::{Budget, ExecutionMetrics, ExecutionSpec};
23use mlua::LuaSerdeExt;
24use mlua_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
25use mlua_pkg::Registry;
26
27use crate::bridge;
28use crate::llm_bridge::LlmRequest;
29use crate::resolver_factory::make_resolver;
30use crate::session::Session;
31use crate::variant_pkg::{register_variant_pkgs, VariantPkg};
32
33/// Layer 1: Prelude combinators (map, reduce, vote, filter).
34/// Embedded at compile time and loaded into every session.
35const PRELUDE: &str = include_str!("prelude.lua");
36
37/// Lua execution engine.
38///
39/// Holds a **shared VM** for lightweight stateless operations (`eval_simple`)
40/// and spawns **per-session VMs** for coroutine-based execution (`start_session`).
41///
42/// Per-session VMs eliminate global namespace pollution between concurrent
43/// sessions — each session's `alc`, `ctx`, and `package.loaded` are fully
44/// isolated.
45pub struct Executor {
46    /// Shared VM for eval_simple (stateless, no session globals).
47    isle: AsyncIsle,
48    _driver: AsyncIsleDriver,
49    /// Package resolver paths, cloned into each per-session VM.
50    lib_paths: Vec<PathBuf>,
51}
52
53impl Executor {
54    pub async fn new(lib_paths: Vec<PathBuf>) -> anyhow::Result<Self> {
55        let paths_for_shared = lib_paths.clone();
56        let (isle, driver) = AsyncIsle::spawn(move |lua| {
57            let mut reg = Registry::new();
58            for path in &paths_for_shared {
59                if let Some(resolver) = make_resolver(path) {
60                    reg.add(resolver);
61                }
62            }
63            reg.install(lua)?;
64            Ok(())
65        })
66        .await?;
67
68        Ok(Self {
69            isle,
70            _driver: driver,
71            lib_paths,
72        })
73    }
74
75    /// Evaluate Lua code without LLM bridge. For lightweight operations
76    /// like reading package metadata.
77    ///
78    /// Uses the shared VM. `extra_lib_paths` must be empty — use
79    /// [`eval_simple_with_paths`] when project-local paths are needed.
80    pub async fn eval_simple(&self, code: String) -> Result<serde_json::Value, String> {
81        self.eval_simple_with_paths(code, vec![], vec![]).await
82    }
83
84    /// Evaluate Lua code without LLM bridge, with optional extra package paths
85    /// and variant pkgs.
86    ///
87    /// When both `extra_lib_paths` and `variant_pkgs` are empty, reuses the
88    /// shared VM (cheap). When either is non-empty, spawns a dedicated VM so
89    /// the extra resolvers are active (slightly more expensive, but `pkg_list`
90    /// is the only caller and it is low-frequency).
91    pub async fn eval_simple_with_paths(
92        &self,
93        code: String,
94        extra_lib_paths: Vec<PathBuf>,
95        variant_pkgs: Vec<VariantPkg>,
96    ) -> Result<serde_json::Value, String> {
97        if extra_lib_paths.is_empty() && variant_pkgs.is_empty() {
98            // Fast path: reuse the long-lived shared VM.
99            let task = self.isle.spawn_exec(move |lua| {
100                let result: mlua::Value = lua
101                    .load(&code)
102                    .eval()
103                    .map_err(|e| IsleError::Lua(e.to_string()))?;
104                let json: serde_json::Value = lua
105                    .from_value(result)
106                    .map_err(|e| IsleError::Lua(e.to_string()))?;
107                serde_json::to_string(&json)
108                    .map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
109            });
110            let json_str = task.await.map_err(|e| e.to_string())?;
111            return serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"));
112        }
113
114        // Slow path: spawn a dedicated VM with extra resolvers prepended.
115        let mut effective = extra_lib_paths;
116        effective.extend(self.lib_paths.iter().cloned());
117
118        let (tmp_isle, _tmp_driver) = AsyncIsle::spawn(move |lua| {
119            let mut reg = Registry::new();
120            // Variant pkgs first so alc.local.toml overrides win over global.
121            register_variant_pkgs(&mut reg, &variant_pkgs);
122            for path in &effective {
123                if let Some(resolver) = make_resolver(path) {
124                    reg.add(resolver);
125                }
126            }
127            reg.install(lua)?;
128            Ok(())
129        })
130        .await
131        .map_err(|e| format!("eval_simple VM spawn failed: {e}"))?;
132
133        let task = tmp_isle.spawn_exec(move |lua| {
134            let result: mlua::Value = lua
135                .load(&code)
136                .eval()
137                .map_err(|e| IsleError::Lua(e.to_string()))?;
138            let json: serde_json::Value = lua
139                .from_value(result)
140                .map_err(|e| IsleError::Lua(e.to_string()))?;
141            serde_json::to_string(&json).map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
142        });
143
144        let json_str = task.await.map_err(|e| e.to_string())?;
145        serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"))
146    }
147
148    /// Start a new Lua execution session on a **dedicated VM**.
149    ///
150    /// Each session gets its own Lua VM (OS thread + mlua instance) so
151    /// concurrent sessions cannot interfere with each other's globals.
152    /// The VM is cleaned up automatically when the session completes or
153    /// is abandoned (all senders drop → channel closes → thread exits).
154    ///
155    /// `extra_lib_paths` are prepended to `self.lib_paths` so project-local
156    /// packages take precedence over the global package directory.
157    /// `variant_pkgs` come from `alc.local.toml` and override both layers
158    /// (registered at the highest priority).
159    pub async fn start_session(
160        &self,
161        code: String,
162        ctx: serde_json::Value,
163        extra_lib_paths: Vec<PathBuf>,
164        variant_pkgs: Vec<VariantPkg>,
165    ) -> Result<Session, String> {
166        let spec = ExecutionSpec::new(code, ctx);
167        let metrics = ExecutionMetrics::new();
168
169        // Extract and apply budget from ctx.budget
170        if let Some(budget) = Budget::from_ctx(&spec.ctx) {
171            metrics.set_budget(budget);
172        }
173
174        let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
175
176        // Build effective lib_paths: extra (project-local) first, then defaults.
177        // Priority: variant_pkgs > extra_lib_paths > self.lib_paths
178        // (ALC_PACKAGES_PATH + global default).
179        let mut effective = extra_lib_paths;
180        effective.extend(self.lib_paths.iter().cloned());
181
182        let bridge_config = bridge::BridgeConfig {
183            llm_tx: Some(llm_tx),
184            ns: spec.namespace.clone(),
185            custom_metrics: metrics.custom_metrics_handle(),
186            budget: metrics.budget_handle(),
187            progress: metrics.progress_handle(),
188            lib_paths: effective.clone(), // fork child VMs inherit project paths
189            variant_pkgs: variant_pkgs.clone(), // fork child VMs inherit variant overrides
190        };
191        let lua_ctx = spec.ctx.clone();
192        let lua_code = spec.code.clone();
193
194        // 1. Spawn a dedicated VM for this session.
195        let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
196            let mut reg = Registry::new();
197            // Variant pkgs first so alc.local.toml overrides win over global.
198            register_variant_pkgs(&mut reg, &variant_pkgs);
199            for path in &effective {
200                if let Some(resolver) = make_resolver(path) {
201                    reg.add(resolver);
202                }
203            }
204            reg.install(lua)?;
205            Ok(())
206        })
207        .await
208        .map_err(|e| format!("Session VM spawn failed: {e}"))?;
209
210        // 2. Setup: register alc.* StdLib, set ctx, load prelude.
211        //    Safe to set globals — this VM is exclusively ours.
212        session_isle
213            .exec(move |lua| {
214                let alc_table = lua.create_table()?;
215                bridge::register(lua, &alc_table, bridge_config)?;
216                lua.globals().set("alc", alc_table)?;
217
218                let ctx_value = lua.to_value(&lua_ctx)?;
219                lua.globals().set("ctx", ctx_value)?;
220
221                lua.load(PRELUDE)
222                    .exec()
223                    .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
224
225                // No need to clear package.loaded — fresh VM.
226
227                Ok("ok".to_string())
228            })
229            .await
230            .map_err(|e| format!("Session setup failed: {e}"))?;
231
232        // 3. Execute user code as a coroutine on the session VM.
233        let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
234        let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
235
236        // Handle no longer needed — all requests have been sent.
237        // The driver keeps the channel alive until the session completes.
238        drop(session_isle);
239
240        Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use std::fs;
248
249    /// Create a temporary package directory with the given name and `init.lua` content.
250    fn make_pkg_dir(parent: &std::path::Path, pkg_name: &str, init_lua: &str) -> PathBuf {
251        let pkg_dir = parent.join(pkg_name);
252        fs::create_dir_all(&pkg_dir).unwrap();
253        fs::write(pkg_dir.join("init.lua"), init_lua).unwrap();
254        parent.to_path_buf()
255    }
256
257    /// `extra_lib_paths=vec![]` — eval_simple must work as before.
258    #[tokio::test]
259    async fn no_extra_lib_paths_eval_simple() {
260        let executor = Executor::new(vec![]).await.unwrap();
261        let result = executor.eval_simple("return 42".to_string()).await.unwrap();
262        assert_eq!(result, serde_json::json!(42));
263    }
264
265    /// `eval_simple_with_paths` with a project-local package.
266    ///
267    /// Creates a temp dir with `test_pkg/init.lua` returning `{value = 99}`,
268    /// then verifies `require("test_pkg").value` == 99 via the extra resolver.
269    #[tokio::test]
270    async fn extra_lib_paths_reachable_via_eval_simple_with_paths() {
271        let tmp = tempfile::tempdir().unwrap();
272        let pkg_root = make_pkg_dir(tmp.path(), "test_pkg", "return { value = 99 }");
273
274        let executor = Executor::new(vec![]).await.unwrap();
275        let code = r#"
276            local pkg = require("test_pkg")
277            return pkg.value
278        "#
279        .to_string();
280
281        let result = executor
282            .eval_simple_with_paths(code, vec![pkg_root], vec![])
283            .await
284            .unwrap();
285
286        assert_eq!(result, serde_json::json!(99));
287    }
288
289    /// Variant pkg with a non-matching directory name resolves via
290    /// `VariantRootResolver` + `PrefixResolver`.
291    #[tokio::test]
292    async fn variant_pkg_resolves_root_and_submodule() {
293        let tmp = tempfile::tempdir().unwrap();
294        // pkg dir name (`physical-dir`) intentionally differs from the
295        // require name (`logical_name`) — variant scope must support this.
296        let pkg_dir = tmp.path().join("physical-dir");
297        fs::create_dir_all(&pkg_dir).unwrap();
298        fs::write(
299            pkg_dir.join("init.lua"),
300            "return { greet = function(n) return 'hi-' .. n end, sub = require('logical_name.sub') }",
301        )
302        .unwrap();
303        fs::write(pkg_dir.join("sub.lua"), "return { value = 7 }").unwrap();
304
305        let executor = Executor::new(vec![]).await.unwrap();
306        let code = r#"
307            local pkg = require("logical_name")
308            return { msg = pkg.greet("there"), sub_value = pkg.sub.value }
309        "#
310        .to_string();
311
312        let result = executor
313            .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("logical_name", pkg_dir)])
314            .await
315            .unwrap();
316
317        assert_eq!(result["msg"], serde_json::json!("hi-there"));
318        assert_eq!(result["sub_value"], serde_json::json!(7));
319    }
320
321    /// Variant pkg overrides a same-name global pkg (priority: variant > global).
322    #[tokio::test]
323    async fn variant_pkg_overrides_global_same_name() {
324        let global_tmp = tempfile::tempdir().unwrap();
325        let variant_tmp = tempfile::tempdir().unwrap();
326
327        // Global: my_pkg returns 1
328        make_pkg_dir(global_tmp.path(), "my_pkg", "return { value = 1 }");
329        // Variant: my_pkg returns 2 — must win
330        let variant_dir = variant_tmp.path().join("my_pkg");
331        fs::create_dir_all(&variant_dir).unwrap();
332        fs::write(variant_dir.join("init.lua"), "return { value = 2 }").unwrap();
333
334        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
335            .await
336            .unwrap();
337
338        let code = r#"
339            local pkg = require("my_pkg")
340            return pkg.value
341        "#
342        .to_string();
343
344        let result = executor
345            .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("my_pkg", variant_dir)])
346            .await
347            .unwrap();
348
349        assert_eq!(result, serde_json::json!(2));
350    }
351
352    /// When `extra_lib_paths` has a pkg with the same name as one in global paths,
353    /// the extra one takes priority (it is prepended).
354    #[tokio::test]
355    async fn extra_lib_paths_priority_over_default() {
356        let global_tmp = tempfile::tempdir().unwrap();
357        let extra_tmp = tempfile::tempdir().unwrap();
358
359        // Global: test_pkg returns 1
360        make_pkg_dir(global_tmp.path(), "test_pkg", "return { value = 1 }");
361        // Extra (project-local): test_pkg returns 2
362        let extra_root = make_pkg_dir(extra_tmp.path(), "test_pkg", "return { value = 2 }");
363
364        // Executor has global as its lib_paths.
365        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
366            .await
367            .unwrap();
368
369        let code = r#"
370            local pkg = require("test_pkg")
371            return pkg.value
372        "#
373        .to_string();
374
375        let result = executor
376            .eval_simple_with_paths(code, vec![extra_root], vec![])
377            .await
378            .unwrap();
379
380        // extra (2) must win over global (1)
381        assert_eq!(result, serde_json::json!(2));
382    }
383}