Skip to main content

algocline_engine/
executor.rs

1//! Lua execution engine.
2//!
3//! Orchestrates StdLib injection and Lua execution for each session:
4//!
5//! 1. **Layer 0** — [`bridge::register`] injects Rust-backed `alc.*` primitives
6//! 2. **Layer 1** — [`PRELUDE`] adds Lua-based combinators (`alc.map`, etc.)
7//! 3. **Layer 2** — [`mlua_pkg::Registry`] makes `require("ucb")` etc.
8//!    resolve from `~/.algocline/packages/`
9//!
10//! ## Execution models
11//!
12//! - **`eval_simple`** — sync eval on a shared VM (no LLM bridge).
13//!   For lightweight ops like reading package metadata.
14//! - **`start_session`** — spawns a **dedicated VM per session**.
15//!   Each session gets an isolated Lua VM so concurrent sessions
16//!   cannot interfere with each other's globals (`alc`, `ctx`).
17//!   `alc.llm()` yields the coroutine, and the VM is cleaned up
18//!   when the session completes or is abandoned.
19
20use std::collections::HashMap;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use algocline_core::{Budget, ExecutionMetrics, ExecutionSpec};
25use mlua::LuaSerdeExt;
26use mlua_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
27use mlua_pkg::Registry;
28
29use crate::bridge;
30use crate::card::FileCardStore;
31use crate::llm_bridge::LlmRequest;
32use crate::resolver_factory::make_resolver;
33use crate::session::Session;
34use crate::state::JsonFileStore;
35use crate::variant_pkg::{register_variant_pkgs, VariantPkg};
36
37/// Layer 1: Prelude combinators (map, reduce, vote, filter).
38/// Embedded at compile time and loaded into every session.
39const PRELUDE: &str = include_str!("prelude.lua");
40
41/// Lua execution engine.
42///
43/// Holds a **shared VM** for lightweight stateless operations (`eval_simple`)
44/// and spawns **per-session VMs** for coroutine-based execution (`start_session`).
45///
46/// Per-session VMs eliminate global namespace pollution between concurrent
47/// sessions — each session's `alc`, `ctx`, and `package.loaded` are fully
48/// isolated.
49pub struct Executor {
50    /// Shared VM for eval_simple (stateless, no session globals).
51    isle: AsyncIsle,
52    _driver: AsyncIsleDriver,
53    /// Package resolver paths, cloned into each per-session VM.
54    lib_paths: Vec<PathBuf>,
55}
56
57impl Executor {
58    pub async fn new(lib_paths: Vec<PathBuf>) -> anyhow::Result<Self> {
59        let paths_for_shared = lib_paths.clone();
60        let (isle, driver) = AsyncIsle::spawn(move |lua| {
61            let mut reg = Registry::new();
62            for path in &paths_for_shared {
63                if let Some(resolver) = make_resolver(path) {
64                    reg.add(resolver);
65                }
66            }
67            reg.install(lua)?;
68            Ok(())
69        })
70        .await?;
71
72        Ok(Self {
73            isle,
74            _driver: driver,
75            lib_paths,
76        })
77    }
78
79    /// Evaluate Lua code without LLM bridge. For lightweight operations
80    /// like reading package metadata.
81    ///
82    /// Uses the shared VM. `extra_lib_paths` must be empty — use
83    /// [`Self::eval_simple_with_paths`] when project-local paths are needed.
84    pub async fn eval_simple(&self, code: String) -> Result<serde_json::Value, String> {
85        self.eval_simple_with_paths(code, vec![], vec![]).await
86    }
87
88    /// Evaluate Lua code without LLM bridge, with optional extra package paths
89    /// and variant pkgs.
90    ///
91    /// When both `extra_lib_paths` and `variant_pkgs` are empty, reuses the
92    /// shared VM (cheap). When either is non-empty, spawns a dedicated VM so
93    /// the extra resolvers are active (slightly more expensive, but `pkg_list`
94    /// is the only caller and it is low-frequency).
95    ///
96    /// The fast path does not register `alc.*` bridge primitives, so the
97    /// `state_store` / `card_store` / `scenarios_dir` handles that
98    /// [`Self::start_session`] requires are not threaded through here —
99    /// callers that need them go through `start_session`.
100    pub async fn eval_simple_with_paths(
101        &self,
102        code: String,
103        extra_lib_paths: Vec<PathBuf>,
104        variant_pkgs: Vec<VariantPkg>,
105    ) -> Result<serde_json::Value, String> {
106        if extra_lib_paths.is_empty() && variant_pkgs.is_empty() {
107            // Fast path: reuse the long-lived shared VM.
108            let task = self.isle.spawn_exec(move |lua| {
109                let result: mlua::Value = lua
110                    .load(&code)
111                    .eval()
112                    .map_err(|e| IsleError::Lua(e.to_string()))?;
113                let json: serde_json::Value = lua
114                    .from_value(result)
115                    .map_err(|e| IsleError::Lua(e.to_string()))?;
116                serde_json::to_string(&json)
117                    .map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
118            });
119            let json_str = task.await.map_err(|e| e.to_string())?;
120            return serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"));
121        }
122
123        // Slow path: spawn a dedicated VM with extra resolvers prepended.
124        let mut effective = extra_lib_paths;
125        effective.extend(self.lib_paths.iter().cloned());
126
127        let (tmp_isle, _tmp_driver) = AsyncIsle::spawn(move |lua| {
128            let mut reg = Registry::new();
129            // Variant pkgs first so alc.local.toml overrides win over global.
130            register_variant_pkgs(&mut reg, &variant_pkgs);
131            for path in &effective {
132                if let Some(resolver) = make_resolver(path) {
133                    reg.add(resolver);
134                }
135            }
136            reg.install(lua)?;
137            Ok(())
138        })
139        .await
140        .map_err(|e| format!("eval_simple VM spawn failed: {e}"))?;
141
142        let task = tmp_isle.spawn_exec(move |lua| {
143            let result: mlua::Value = lua
144                .load(&code)
145                .eval()
146                .map_err(|e| IsleError::Lua(e.to_string()))?;
147            let json: serde_json::Value = lua
148                .from_value(result)
149                .map_err(|e| IsleError::Lua(e.to_string()))?;
150            serde_json::to_string(&json).map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
151        });
152
153        let json_str = task.await.map_err(|e| e.to_string())?;
154        serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"))
155    }
156
157    /// Start a new Lua execution session on a **dedicated VM**.
158    ///
159    /// Each session gets its own Lua VM (OS thread + mlua instance) so
160    /// concurrent sessions cannot interfere with each other's globals.
161    /// The VM is cleaned up automatically when the session completes or
162    /// is abandoned (all senders drop → channel closes → thread exits).
163    ///
164    /// `extra_lib_paths` are prepended to `self.lib_paths` so project-local
165    /// packages take precedence over the global package directory.
166    /// `variant_pkgs` come from `alc.local.toml` and override both layers
167    /// (registered at the highest priority).
168    ///
169    /// `state_store` / `card_store` / `scenarios_dir` are resolved by the
170    /// service layer (typically from `AppConfig.app_dir()`) so the engine
171    /// crate never touches HOME. They flow through [`bridge::BridgeConfig`]
172    /// to back `alc.state.*` / `alc.card.*` / `alc._dirs.scenarios`.
173    #[allow(clippy::too_many_arguments)]
174    pub async fn start_session(
175        &self,
176        code: String,
177        ctx: serde_json::Value,
178        extra_lib_paths: Vec<PathBuf>,
179        variant_pkgs: Vec<VariantPkg>,
180        state_store: Arc<JsonFileStore>,
181        card_store: Arc<FileCardStore>,
182        scenarios_dir: PathBuf,
183    ) -> Result<Session, String> {
184        let spec = ExecutionSpec::new(code, ctx);
185        let metrics = ExecutionMetrics::new();
186
187        // Extract and apply budget from ctx.budget
188        if let Some(budget) = Budget::from_ctx(&spec.ctx) {
189            metrics.set_budget(budget);
190        }
191
192        let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
193
194        // Build effective lib_paths: extra (project-local) first, then defaults.
195        // Priority: variant_pkgs > extra_lib_paths > self.lib_paths
196        // (ALC_PACKAGES_PATH + global default).
197        let mut effective = extra_lib_paths;
198        effective.extend(self.lib_paths.iter().cloned());
199
200        // Obtain the log-capture sink before moving `metrics` into BridgeConfig.
201        // The same Arc is shared with Session so snapshot() can read recent_logs.
202        let log_sink = metrics.log_sink_handle();
203
204        let bridge_config = bridge::BridgeConfig {
205            llm_tx: Some(llm_tx),
206            ns: spec.namespace.clone(),
207            custom_metrics: metrics.custom_metrics_handle(),
208            stats: metrics.stats_handle(),
209            budget: metrics.budget_handle(),
210            progress: metrics.progress_handle(),
211            lib_paths: effective.clone(), // fork child VMs inherit project paths
212            variant_pkgs: variant_pkgs.clone(), // fork child VMs inherit variant overrides
213            state_store,
214            card_store,
215            scenarios_dir,
216            log_sink: Some(log_sink.clone()),
217        };
218        let lua_ctx = spec.ctx.clone();
219        let lua_code = spec.code.clone();
220
221        // 1. Spawn a dedicated VM for this session.
222        let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
223            let mut reg = Registry::new();
224            // Variant pkgs first so alc.local.toml overrides win over global.
225            register_variant_pkgs(&mut reg, &variant_pkgs);
226            for path in &effective {
227                if let Some(resolver) = make_resolver(path) {
228                    reg.add(resolver);
229                }
230            }
231            reg.install(lua)?;
232            Ok(())
233        })
234        .await
235        .map_err(|e| format!("Session VM spawn failed: {e}"))?;
236
237        // 2. Setup: register alc.* StdLib, set ctx, load prelude.
238        //    Safe to set globals — this VM is exclusively ours.
239        session_isle
240            .exec(move |lua| {
241                let alc_table = lua.create_table()?;
242                bridge::register(lua, &alc_table, bridge_config)?;
243                lua.globals().set("alc", alc_table)?;
244
245                let ctx_value = lua.to_value(&lua_ctx)?;
246                lua.globals().set("ctx", ctx_value)?;
247
248                lua.load(PRELUDE)
249                    .exec()
250                    .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
251
252                // Note: `print()` redirect is handled by `bridge::register` via
253                // `data::register_print` when `BridgeConfig::log_sink` is Some.
254                // That implementation routes to both tracing (alc.lua.print) and
255                // the per-session LogSink ring buffer, keeping stdout clean for
256                // the rmcp JSON-RPC stdio transport.
257                //
258                // `io.write` is intentionally left unchanged — scripts that
259                // explicitly target stdout/stderr can still use it.
260
261                // No need to clear package.loaded — fresh VM.
262
263                Ok("ok".to_string())
264            })
265            .await
266            .map_err(|e| format!("Session setup failed: {e}"))?;
267
268        // 3. Execute user code as a coroutine on the session VM.
269        let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
270        let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
271
272        // Handle no longer needed — all requests have been sent.
273        // The driver keeps the channel alive until the session completes.
274        drop(session_isle);
275
276        Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
277    }
278
279    /// Like [`start_session`] but registers `alc.env` inside the setup closure
280    /// with a pre-built, frozen env snapshot.
281    ///
282    /// The `env_map` must be fully populated from all sources (inject / dotenv /
283    /// os.env) **before** this call — the TIME boundary freeze happens at the
284    /// call site (i.e. `alc_run` invocation in the service layer).
285    ///
286    /// Existing `start_session` call sites remain untouched (out-of-scope wave
287    /// zero design).  Service layer code that resolves env calls this wrapper
288    /// instead.
289    #[allow(clippy::too_many_arguments)]
290    pub async fn start_session_with_env(
291        &self,
292        env_map: Arc<HashMap<String, String>>,
293        code: String,
294        ctx: serde_json::Value,
295        extra_lib_paths: Vec<PathBuf>,
296        variant_pkgs: Vec<VariantPkg>,
297        state_store: Arc<JsonFileStore>,
298        card_store: Arc<FileCardStore>,
299        scenarios_dir: PathBuf,
300    ) -> Result<Session, String> {
301        let spec = ExecutionSpec::new(code, ctx);
302        let metrics = ExecutionMetrics::new();
303
304        if let Some(budget) = Budget::from_ctx(&spec.ctx) {
305            metrics.set_budget(budget);
306        }
307
308        let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
309
310        let mut effective = extra_lib_paths;
311        effective.extend(self.lib_paths.iter().cloned());
312
313        let log_sink = metrics.log_sink_handle();
314
315        let bridge_config = bridge::BridgeConfig {
316            llm_tx: Some(llm_tx),
317            ns: spec.namespace.clone(),
318            custom_metrics: metrics.custom_metrics_handle(),
319            stats: metrics.stats_handle(),
320            budget: metrics.budget_handle(),
321            progress: metrics.progress_handle(),
322            lib_paths: effective.clone(),
323            variant_pkgs: variant_pkgs.clone(),
324            state_store,
325            card_store,
326            scenarios_dir,
327            log_sink: Some(log_sink.clone()),
328        };
329        let lua_ctx = spec.ctx.clone();
330        let lua_code = spec.code.clone();
331
332        let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
333            let mut reg = Registry::new();
334            register_variant_pkgs(&mut reg, &variant_pkgs);
335            for path in &effective {
336                if let Some(resolver) = make_resolver(path) {
337                    reg.add(resolver);
338                }
339            }
340            reg.install(lua)?;
341            Ok(())
342        })
343        .await
344        .map_err(|e| format!("Session VM spawn failed: {e}"))?;
345
346        session_isle
347            .exec(move |lua| {
348                let alc_table = lua.create_table()?;
349                bridge::register(lua, &alc_table, bridge_config)?;
350                // Register alc.env with the frozen snapshot (TIME boundary: fully
351                // populated before this call, never re-read from OS after this point).
352                bridge::register_env(lua, &alc_table, env_map)
353                    .map_err(|e| IsleError::Lua(e.to_string()))?;
354                lua.globals().set("alc", alc_table)?;
355
356                let ctx_value = lua.to_value(&lua_ctx)?;
357                lua.globals().set("ctx", ctx_value)?;
358
359                lua.load(PRELUDE)
360                    .exec()
361                    .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
362
363                Ok("ok".to_string())
364            })
365            .await
366            .map_err(|e| format!("Session setup failed: {e}"))?;
367
368        let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
369        let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
370
371        drop(session_isle);
372
373        Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use std::fs;
381
382    /// Create a temporary package directory with the given name and `init.lua` content.
383    fn make_pkg_dir(parent: &std::path::Path, pkg_name: &str, init_lua: &str) -> PathBuf {
384        let pkg_dir = parent.join(pkg_name);
385        fs::create_dir_all(&pkg_dir).unwrap();
386        fs::write(pkg_dir.join("init.lua"), init_lua).unwrap();
387        parent.to_path_buf()
388    }
389
390    /// `extra_lib_paths=vec![]` — eval_simple must work as before.
391    #[tokio::test]
392    async fn no_extra_lib_paths_eval_simple() {
393        let executor = Executor::new(vec![]).await.unwrap();
394        let result = executor.eval_simple("return 42".to_string()).await.unwrap();
395        assert_eq!(result, serde_json::json!(42));
396    }
397
398    /// `eval_simple_with_paths` with a project-local package.
399    ///
400    /// Creates a temp dir with `test_pkg/init.lua` returning `{value = 99}`,
401    /// then verifies `require("test_pkg").value` == 99 via the extra resolver.
402    #[tokio::test]
403    async fn extra_lib_paths_reachable_via_eval_simple_with_paths() {
404        let tmp = tempfile::tempdir().unwrap();
405        let pkg_root = make_pkg_dir(tmp.path(), "test_pkg", "return { value = 99 }");
406
407        let executor = Executor::new(vec![]).await.unwrap();
408        let code = r#"
409            local pkg = require("test_pkg")
410            return pkg.value
411        "#
412        .to_string();
413
414        let result = executor
415            .eval_simple_with_paths(code, vec![pkg_root], vec![])
416            .await
417            .unwrap();
418
419        assert_eq!(result, serde_json::json!(99));
420    }
421
422    /// Variant pkg with a non-matching directory name resolves via
423    /// `VariantRootResolver` + `PrefixResolver`.
424    #[tokio::test]
425    async fn variant_pkg_resolves_root_and_submodule() {
426        let tmp = tempfile::tempdir().unwrap();
427        // pkg dir name (`physical-dir`) intentionally differs from the
428        // require name (`logical_name`) — variant scope must support this.
429        let pkg_dir = tmp.path().join("physical-dir");
430        fs::create_dir_all(&pkg_dir).unwrap();
431        fs::write(
432            pkg_dir.join("init.lua"),
433            "return { greet = function(n) return 'hi-' .. n end, sub = require('logical_name.sub') }",
434        )
435        .unwrap();
436        fs::write(pkg_dir.join("sub.lua"), "return { value = 7 }").unwrap();
437
438        let executor = Executor::new(vec![]).await.unwrap();
439        let code = r#"
440            local pkg = require("logical_name")
441            return { msg = pkg.greet("there"), sub_value = pkg.sub.value }
442        "#
443        .to_string();
444
445        let result = executor
446            .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("logical_name", pkg_dir)])
447            .await
448            .unwrap();
449
450        assert_eq!(result["msg"], serde_json::json!("hi-there"));
451        assert_eq!(result["sub_value"], serde_json::json!(7));
452    }
453
454    /// Variant pkg overrides a same-name global pkg (priority: variant > global).
455    #[tokio::test]
456    async fn variant_pkg_overrides_global_same_name() {
457        let global_tmp = tempfile::tempdir().unwrap();
458        let variant_tmp = tempfile::tempdir().unwrap();
459
460        // Global: my_pkg returns 1
461        make_pkg_dir(global_tmp.path(), "my_pkg", "return { value = 1 }");
462        // Variant: my_pkg returns 2 — must win
463        let variant_dir = variant_tmp.path().join("my_pkg");
464        fs::create_dir_all(&variant_dir).unwrap();
465        fs::write(variant_dir.join("init.lua"), "return { value = 2 }").unwrap();
466
467        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
468            .await
469            .unwrap();
470
471        let code = r#"
472            local pkg = require("my_pkg")
473            return pkg.value
474        "#
475        .to_string();
476
477        let result = executor
478            .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("my_pkg", variant_dir)])
479            .await
480            .unwrap();
481
482        assert_eq!(result, serde_json::json!(2));
483    }
484
485    /// When `extra_lib_paths` has a pkg with the same name as one in global paths,
486    /// the extra one takes priority (it is prepended).
487    #[tokio::test]
488    async fn extra_lib_paths_priority_over_default() {
489        let global_tmp = tempfile::tempdir().unwrap();
490        let extra_tmp = tempfile::tempdir().unwrap();
491
492        // Global: test_pkg returns 1
493        make_pkg_dir(global_tmp.path(), "test_pkg", "return { value = 1 }");
494        // Extra (project-local): test_pkg returns 2
495        let extra_root = make_pkg_dir(extra_tmp.path(), "test_pkg", "return { value = 2 }");
496
497        // Executor has global as its lib_paths.
498        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
499            .await
500            .unwrap();
501
502        let code = r#"
503            local pkg = require("test_pkg")
504            return pkg.value
505        "#
506        .to_string();
507
508        let result = executor
509            .eval_simple_with_paths(code, vec![extra_root], vec![])
510            .await
511            .unwrap();
512
513        // extra (2) must win over global (1)
514        assert_eq!(result, serde_json::json!(2));
515    }
516}