Skip to main content

algocline_engine/
executor.rs

1//! Lua execution engine.
2//!
3//! Orchestrates StdLib injection and Lua execution for each session:
4//!
5//! 1. **Layer 0** — [`bridge::register`] injects Rust-backed `alc.*` primitives
6//! 2. **Layer 1** — [`PRELUDE`] adds Lua-based combinators (`alc.map`, etc.)
7//! 3. **Layer 2** — [`mlua_pkg::Registry`] makes `require("ucb")` etc.
8//!    resolve from `~/.algocline/packages/`
9//!
10//! ## Execution models
11//!
12//! - **`eval_simple`** — sync eval on a shared VM (no LLM bridge).
13//!   For lightweight ops like reading package metadata.
14//! - **`start_session`** — spawns a **dedicated VM per session**.
15//!   Each session gets an isolated Lua VM so concurrent sessions
16//!   cannot interfere with each other's globals (`alc`, `ctx`).
17//!   `alc.llm()` yields the coroutine, and the VM is cleaned up
18//!   when the session completes or is abandoned.
19
20use std::path::PathBuf;
21
22use algocline_core::{Budget, ExecutionMetrics, ExecutionSpec};
23use mlua::LuaSerdeExt;
24use mlua_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
25use mlua_pkg::{resolvers::FsResolver, sandbox::SymlinkAwareSandbox, Registry};
26
27use crate::bridge;
28use crate::llm_bridge::LlmRequest;
29use crate::session::Session;
30
31/// Build an `FsResolver` for the given path.
32///
33/// By default uses `SymlinkAwareSandbox` so that `alc_pkg_link` symlinks
34/// are followed. Set `ALC_PKG_STRICT=1` to use the strict `FsSandbox`
35/// (rejects all symlinks pointing outside the root).
36fn make_resolver(path: &std::path::Path) -> Option<FsResolver> {
37    let strict = std::env::var("ALC_PKG_STRICT")
38        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
39        .unwrap_or(false);
40
41    if strict {
42        FsResolver::new(path).ok()
43    } else {
44        SymlinkAwareSandbox::new(path)
45            .ok()
46            .map(FsResolver::with_sandbox)
47    }
48}
49
50/// Layer 1: Prelude combinators (map, reduce, vote, filter).
51/// Embedded at compile time and loaded into every session.
52const PRELUDE: &str = include_str!("prelude.lua");
53
54/// Lua execution engine.
55///
56/// Holds a **shared VM** for lightweight stateless operations (`eval_simple`)
57/// and spawns **per-session VMs** for coroutine-based execution (`start_session`).
58///
59/// Per-session VMs eliminate global namespace pollution between concurrent
60/// sessions — each session's `alc`, `ctx`, and `package.loaded` are fully
61/// isolated.
62pub struct Executor {
63    /// Shared VM for eval_simple (stateless, no session globals).
64    isle: AsyncIsle,
65    _driver: AsyncIsleDriver,
66    /// Package resolver paths, cloned into each per-session VM.
67    lib_paths: Vec<PathBuf>,
68}
69
70impl Executor {
71    pub async fn new(lib_paths: Vec<PathBuf>) -> anyhow::Result<Self> {
72        let paths_for_shared = lib_paths.clone();
73        let (isle, driver) = AsyncIsle::spawn(move |lua| {
74            let mut reg = Registry::new();
75            for path in &paths_for_shared {
76                if let Some(resolver) = make_resolver(path) {
77                    reg.add(resolver);
78                }
79            }
80            reg.install(lua)?;
81            Ok(())
82        })
83        .await?;
84
85        Ok(Self {
86            isle,
87            _driver: driver,
88            lib_paths,
89        })
90    }
91
92    /// Evaluate Lua code without LLM bridge. For lightweight operations
93    /// like reading package metadata.
94    ///
95    /// Uses the shared VM. `extra_lib_paths` must be empty — use
96    /// [`eval_simple_with_paths`] when project-local paths are needed.
97    pub async fn eval_simple(&self, code: String) -> Result<serde_json::Value, String> {
98        self.eval_simple_with_paths(code, vec![]).await
99    }
100
101    /// Evaluate Lua code without LLM bridge, with optional extra package paths.
102    ///
103    /// When `extra_lib_paths` is empty, reuses the shared VM (cheap).
104    /// When non-empty, spawns a dedicated VM so the extra resolvers are active
105    /// (slightly more expensive, but `pkg_list` is the only caller and it is
106    /// low-frequency).
107    pub async fn eval_simple_with_paths(
108        &self,
109        code: String,
110        extra_lib_paths: Vec<PathBuf>,
111    ) -> Result<serde_json::Value, String> {
112        if extra_lib_paths.is_empty() {
113            // Fast path: reuse the long-lived shared VM.
114            let task = self.isle.spawn_exec(move |lua| {
115                let result: mlua::Value = lua
116                    .load(&code)
117                    .eval()
118                    .map_err(|e| IsleError::Lua(e.to_string()))?;
119                let json: serde_json::Value = lua
120                    .from_value(result)
121                    .map_err(|e| IsleError::Lua(e.to_string()))?;
122                serde_json::to_string(&json)
123                    .map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
124            });
125            let json_str = task.await.map_err(|e| e.to_string())?;
126            return serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"));
127        }
128
129        // Slow path: spawn a dedicated VM with extra resolvers prepended.
130        let mut effective = extra_lib_paths;
131        effective.extend(self.lib_paths.iter().cloned());
132
133        let (tmp_isle, _tmp_driver) = AsyncIsle::spawn(move |lua| {
134            let mut reg = Registry::new();
135            for path in &effective {
136                if let Some(resolver) = make_resolver(path) {
137                    reg.add(resolver);
138                }
139            }
140            reg.install(lua)?;
141            Ok(())
142        })
143        .await
144        .map_err(|e| format!("eval_simple VM spawn failed: {e}"))?;
145
146        let task = tmp_isle.spawn_exec(move |lua| {
147            let result: mlua::Value = lua
148                .load(&code)
149                .eval()
150                .map_err(|e| IsleError::Lua(e.to_string()))?;
151            let json: serde_json::Value = lua
152                .from_value(result)
153                .map_err(|e| IsleError::Lua(e.to_string()))?;
154            serde_json::to_string(&json).map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
155        });
156
157        let json_str = task.await.map_err(|e| e.to_string())?;
158        serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"))
159    }
160
161    /// Start a new Lua execution session on a **dedicated VM**.
162    ///
163    /// Each session gets its own Lua VM (OS thread + mlua instance) so
164    /// concurrent sessions cannot interfere with each other's globals.
165    /// The VM is cleaned up automatically when the session completes or
166    /// is abandoned (all senders drop → channel closes → thread exits).
167    ///
168    /// `extra_lib_paths` are prepended to `self.lib_paths` so project-local
169    /// packages take precedence over the global package directory.
170    pub async fn start_session(
171        &self,
172        code: String,
173        ctx: serde_json::Value,
174        extra_lib_paths: Vec<PathBuf>,
175    ) -> Result<Session, String> {
176        let spec = ExecutionSpec::new(code, ctx);
177        let metrics = ExecutionMetrics::new();
178
179        // Extract and apply budget from ctx.budget
180        if let Some(budget) = Budget::from_ctx(&spec.ctx) {
181            metrics.set_budget(budget);
182        }
183
184        let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
185
186        // Build effective lib_paths: extra (project-local) first, then defaults.
187        // Priority: extra_lib_paths > self.lib_paths (ALC_PACKAGES_PATH + global default).
188        let mut effective = extra_lib_paths;
189        effective.extend(self.lib_paths.iter().cloned());
190
191        let bridge_config = bridge::BridgeConfig {
192            llm_tx: Some(llm_tx),
193            ns: spec.namespace.clone(),
194            custom_metrics: metrics.custom_metrics_handle(),
195            budget: metrics.budget_handle(),
196            progress: metrics.progress_handle(),
197            lib_paths: effective.clone(), // fork child VMs inherit project paths
198        };
199        let lua_ctx = spec.ctx.clone();
200        let lua_code = spec.code.clone();
201
202        // 1. Spawn a dedicated VM for this session.
203        let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
204            let mut reg = Registry::new();
205            for path in &effective {
206                if let Some(resolver) = make_resolver(path) {
207                    reg.add(resolver);
208                }
209            }
210            reg.install(lua)?;
211            Ok(())
212        })
213        .await
214        .map_err(|e| format!("Session VM spawn failed: {e}"))?;
215
216        // 2. Setup: register alc.* StdLib, set ctx, load prelude.
217        //    Safe to set globals — this VM is exclusively ours.
218        session_isle
219            .exec(move |lua| {
220                let alc_table = lua.create_table()?;
221                bridge::register(lua, &alc_table, bridge_config)?;
222                lua.globals().set("alc", alc_table)?;
223
224                let ctx_value = lua.to_value(&lua_ctx)?;
225                lua.globals().set("ctx", ctx_value)?;
226
227                lua.load(PRELUDE)
228                    .exec()
229                    .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
230
231                // No need to clear package.loaded — fresh VM.
232
233                Ok("ok".to_string())
234            })
235            .await
236            .map_err(|e| format!("Session setup failed: {e}"))?;
237
238        // 3. Execute user code as a coroutine on the session VM.
239        let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
240        let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
241
242        // Handle no longer needed — all requests have been sent.
243        // The driver keeps the channel alive until the session completes.
244        drop(session_isle);
245
246        Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use std::fs;
254
255    /// Create a temporary package directory with the given name and `init.lua` content.
256    fn make_pkg_dir(parent: &std::path::Path, pkg_name: &str, init_lua: &str) -> PathBuf {
257        let pkg_dir = parent.join(pkg_name);
258        fs::create_dir_all(&pkg_dir).unwrap();
259        fs::write(pkg_dir.join("init.lua"), init_lua).unwrap();
260        parent.to_path_buf()
261    }
262
263    /// `extra_lib_paths=vec![]` — eval_simple must work as before.
264    #[tokio::test]
265    async fn no_extra_lib_paths_eval_simple() {
266        let executor = Executor::new(vec![]).await.unwrap();
267        let result = executor.eval_simple("return 42".to_string()).await.unwrap();
268        assert_eq!(result, serde_json::json!(42));
269    }
270
271    /// `eval_simple_with_paths` with a project-local package.
272    ///
273    /// Creates a temp dir with `test_pkg/init.lua` returning `{value = 99}`,
274    /// then verifies `require("test_pkg").value` == 99 via the extra resolver.
275    #[tokio::test]
276    async fn extra_lib_paths_reachable_via_eval_simple_with_paths() {
277        let tmp = tempfile::tempdir().unwrap();
278        let pkg_root = make_pkg_dir(tmp.path(), "test_pkg", "return { value = 99 }");
279
280        let executor = Executor::new(vec![]).await.unwrap();
281        let code = r#"
282            local pkg = require("test_pkg")
283            return pkg.value
284        "#
285        .to_string();
286
287        let result = executor
288            .eval_simple_with_paths(code, vec![pkg_root])
289            .await
290            .unwrap();
291
292        assert_eq!(result, serde_json::json!(99));
293    }
294
295    /// When `extra_lib_paths` has a pkg with the same name as one in global paths,
296    /// the extra one takes priority (it is prepended).
297    #[tokio::test]
298    async fn extra_lib_paths_priority_over_default() {
299        let global_tmp = tempfile::tempdir().unwrap();
300        let extra_tmp = tempfile::tempdir().unwrap();
301
302        // Global: test_pkg returns 1
303        make_pkg_dir(global_tmp.path(), "test_pkg", "return { value = 1 }");
304        // Extra (project-local): test_pkg returns 2
305        let extra_root = make_pkg_dir(extra_tmp.path(), "test_pkg", "return { value = 2 }");
306
307        // Executor has global as its lib_paths.
308        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
309            .await
310            .unwrap();
311
312        let code = r#"
313            local pkg = require("test_pkg")
314            return pkg.value
315        "#
316        .to_string();
317
318        let result = executor
319            .eval_simple_with_paths(code, vec![extra_root])
320            .await
321            .unwrap();
322
323        // extra (2) must win over global (1)
324        assert_eq!(result, serde_json::json!(2));
325    }
326}