Skip to main content

algocline_engine/
executor.rs

1//! Lua execution engine.
2//!
3//! Orchestrates StdLib injection and Lua execution for each session:
4//!
5//! 1. **Layer 0** — [`bridge::register`] injects Rust-backed `alc.*` primitives
6//! 2. **Layer 1** — [`PRELUDE`] adds Lua-based combinators (`alc.map`, etc.)
7//! 3. **Layer 2** — [`mlua_pkg::Registry`] makes `require("ucb")` etc.
8//!    resolve from `~/.algocline/packages/`
9//!
10//! ## Execution models
11//!
12//! - **`eval_simple`** — sync eval on a shared VM (no LLM bridge).
13//!   For lightweight ops like reading package metadata.
14//! - **`start_session`** — spawns a **dedicated VM per session**.
15//!   Each session gets an isolated Lua VM so concurrent sessions
16//!   cannot interfere with each other's globals (`alc`, `ctx`).
17//!   `alc.llm()` yields the coroutine, and the VM is cleaned up
18//!   when the session completes or is abandoned.
19
20use std::path::PathBuf;
21
22use algocline_core::{Budget, ExecutionMetrics, ExecutionSpec};
23use mlua::LuaSerdeExt;
24use mlua_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
25use mlua_pkg::{resolvers::FsResolver, Registry};
26
27use crate::bridge;
28use crate::llm_bridge::LlmRequest;
29use crate::session::Session;
30
31/// Layer 1: Prelude combinators (map, reduce, vote, filter).
32/// Embedded at compile time and loaded into every session.
33const PRELUDE: &str = include_str!("prelude.lua");
34
35/// Lua execution engine.
36///
37/// Holds a **shared VM** for lightweight stateless operations (`eval_simple`)
38/// and spawns **per-session VMs** for coroutine-based execution (`start_session`).
39///
40/// Per-session VMs eliminate global namespace pollution between concurrent
41/// sessions — each session's `alc`, `ctx`, and `package.loaded` are fully
42/// isolated.
43pub struct Executor {
44    /// Shared VM for eval_simple (stateless, no session globals).
45    isle: AsyncIsle,
46    _driver: AsyncIsleDriver,
47    /// Package resolver paths, cloned into each per-session VM.
48    lib_paths: Vec<PathBuf>,
49}
50
51impl Executor {
52    pub async fn new(lib_paths: Vec<PathBuf>) -> anyhow::Result<Self> {
53        let paths_for_shared = lib_paths.clone();
54        let (isle, driver) = AsyncIsle::spawn(move |lua| {
55            let mut reg = Registry::new();
56            for path in &paths_for_shared {
57                if let Ok(resolver) = FsResolver::new(path) {
58                    reg.add(resolver);
59                }
60            }
61            reg.install(lua)?;
62            Ok(())
63        })
64        .await?;
65
66        Ok(Self {
67            isle,
68            _driver: driver,
69            lib_paths,
70        })
71    }
72
73    /// Evaluate Lua code without LLM bridge. For lightweight operations
74    /// like reading package metadata.
75    ///
76    /// Uses the shared VM. `extra_lib_paths` must be empty — use
77    /// [`eval_simple_with_paths`] when project-local paths are needed.
78    pub async fn eval_simple(&self, code: String) -> Result<serde_json::Value, String> {
79        self.eval_simple_with_paths(code, vec![]).await
80    }
81
82    /// Evaluate Lua code without LLM bridge, with optional extra package paths.
83    ///
84    /// When `extra_lib_paths` is empty, reuses the shared VM (cheap).
85    /// When non-empty, spawns a dedicated VM so the extra resolvers are active
86    /// (slightly more expensive, but `pkg_list` is the only caller and it is
87    /// low-frequency).
88    pub async fn eval_simple_with_paths(
89        &self,
90        code: String,
91        extra_lib_paths: Vec<PathBuf>,
92    ) -> Result<serde_json::Value, String> {
93        if extra_lib_paths.is_empty() {
94            // Fast path: reuse the long-lived shared VM.
95            let task = self.isle.spawn_exec(move |lua| {
96                let result: mlua::Value = lua
97                    .load(&code)
98                    .eval()
99                    .map_err(|e| IsleError::Lua(e.to_string()))?;
100                let json: serde_json::Value = lua
101                    .from_value(result)
102                    .map_err(|e| IsleError::Lua(e.to_string()))?;
103                serde_json::to_string(&json)
104                    .map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
105            });
106            let json_str = task.await.map_err(|e| e.to_string())?;
107            return serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"));
108        }
109
110        // Slow path: spawn a dedicated VM with extra resolvers prepended.
111        let mut effective = extra_lib_paths;
112        effective.extend(self.lib_paths.iter().cloned());
113
114        let (tmp_isle, _tmp_driver) = AsyncIsle::spawn(move |lua| {
115            let mut reg = Registry::new();
116            for path in &effective {
117                if let Ok(resolver) = FsResolver::new(path) {
118                    reg.add(resolver);
119                }
120            }
121            reg.install(lua)?;
122            Ok(())
123        })
124        .await
125        .map_err(|e| format!("eval_simple VM spawn failed: {e}"))?;
126
127        let task = tmp_isle.spawn_exec(move |lua| {
128            let result: mlua::Value = lua
129                .load(&code)
130                .eval()
131                .map_err(|e| IsleError::Lua(e.to_string()))?;
132            let json: serde_json::Value = lua
133                .from_value(result)
134                .map_err(|e| IsleError::Lua(e.to_string()))?;
135            serde_json::to_string(&json).map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
136        });
137
138        let json_str = task.await.map_err(|e| e.to_string())?;
139        serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"))
140    }
141
142    /// Start a new Lua execution session on a **dedicated VM**.
143    ///
144    /// Each session gets its own Lua VM (OS thread + mlua instance) so
145    /// concurrent sessions cannot interfere with each other's globals.
146    /// The VM is cleaned up automatically when the session completes or
147    /// is abandoned (all senders drop → channel closes → thread exits).
148    ///
149    /// `extra_lib_paths` are prepended to `self.lib_paths` so project-local
150    /// packages take precedence over the global package directory.
151    pub async fn start_session(
152        &self,
153        code: String,
154        ctx: serde_json::Value,
155        extra_lib_paths: Vec<PathBuf>,
156    ) -> Result<Session, String> {
157        let spec = ExecutionSpec::new(code, ctx);
158        let metrics = ExecutionMetrics::new();
159
160        // Extract and apply budget from ctx.budget
161        if let Some(budget) = Budget::from_ctx(&spec.ctx) {
162            metrics.set_budget(budget);
163        }
164
165        let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
166
167        // Build effective lib_paths: extra (project-local) first, then defaults.
168        // Priority: extra_lib_paths > self.lib_paths (ALC_PACKAGES_PATH + global default).
169        let mut effective = extra_lib_paths;
170        effective.extend(self.lib_paths.iter().cloned());
171
172        let bridge_config = bridge::BridgeConfig {
173            llm_tx: Some(llm_tx),
174            ns: spec.namespace.clone(),
175            custom_metrics: metrics.custom_metrics_handle(),
176            budget: metrics.budget_handle(),
177            progress: metrics.progress_handle(),
178            lib_paths: effective.clone(), // fork child VMs inherit project paths
179        };
180        let lua_ctx = spec.ctx.clone();
181        let lua_code = spec.code.clone();
182
183        // 1. Spawn a dedicated VM for this session.
184        let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
185            let mut reg = Registry::new();
186            for path in &effective {
187                if let Ok(resolver) = FsResolver::new(path) {
188                    reg.add(resolver);
189                }
190            }
191            reg.install(lua)?;
192            Ok(())
193        })
194        .await
195        .map_err(|e| format!("Session VM spawn failed: {e}"))?;
196
197        // 2. Setup: register alc.* StdLib, set ctx, load prelude.
198        //    Safe to set globals — this VM is exclusively ours.
199        session_isle
200            .exec(move |lua| {
201                let alc_table = lua.create_table()?;
202                bridge::register(lua, &alc_table, bridge_config)?;
203                lua.globals().set("alc", alc_table)?;
204
205                let ctx_value = lua.to_value(&lua_ctx)?;
206                lua.globals().set("ctx", ctx_value)?;
207
208                lua.load(PRELUDE)
209                    .exec()
210                    .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
211
212                // No need to clear package.loaded — fresh VM.
213
214                Ok("ok".to_string())
215            })
216            .await
217            .map_err(|e| format!("Session setup failed: {e}"))?;
218
219        // 3. Execute user code as a coroutine on the session VM.
220        let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
221        let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
222
223        // Handle no longer needed — all requests have been sent.
224        // The driver keeps the channel alive until the session completes.
225        drop(session_isle);
226
227        Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use std::fs;
235
236    /// Create a temporary package directory with the given name and `init.lua` content.
237    fn make_pkg_dir(parent: &std::path::Path, pkg_name: &str, init_lua: &str) -> PathBuf {
238        let pkg_dir = parent.join(pkg_name);
239        fs::create_dir_all(&pkg_dir).unwrap();
240        fs::write(pkg_dir.join("init.lua"), init_lua).unwrap();
241        parent.to_path_buf()
242    }
243
244    /// `extra_lib_paths=vec![]` — eval_simple must work as before.
245    #[tokio::test]
246    async fn no_extra_lib_paths_eval_simple() {
247        let executor = Executor::new(vec![]).await.unwrap();
248        let result = executor.eval_simple("return 42".to_string()).await.unwrap();
249        assert_eq!(result, serde_json::json!(42));
250    }
251
252    /// `eval_simple_with_paths` with a project-local package.
253    ///
254    /// Creates a temp dir with `test_pkg/init.lua` returning `{value = 99}`,
255    /// then verifies `require("test_pkg").value` == 99 via the extra resolver.
256    #[tokio::test]
257    async fn extra_lib_paths_reachable_via_eval_simple_with_paths() {
258        let tmp = tempfile::tempdir().unwrap();
259        let pkg_root = make_pkg_dir(tmp.path(), "test_pkg", "return { value = 99 }");
260
261        let executor = Executor::new(vec![]).await.unwrap();
262        let code = r#"
263            local pkg = require("test_pkg")
264            return pkg.value
265        "#
266        .to_string();
267
268        let result = executor
269            .eval_simple_with_paths(code, vec![pkg_root])
270            .await
271            .unwrap();
272
273        assert_eq!(result, serde_json::json!(99));
274    }
275
276    /// When `extra_lib_paths` has a pkg with the same name as one in global paths,
277    /// the extra one takes priority (it is prepended).
278    #[tokio::test]
279    async fn extra_lib_paths_priority_over_default() {
280        let global_tmp = tempfile::tempdir().unwrap();
281        let extra_tmp = tempfile::tempdir().unwrap();
282
283        // Global: test_pkg returns 1
284        make_pkg_dir(global_tmp.path(), "test_pkg", "return { value = 1 }");
285        // Extra (project-local): test_pkg returns 2
286        let extra_root = make_pkg_dir(extra_tmp.path(), "test_pkg", "return { value = 2 }");
287
288        // Executor has global as its lib_paths.
289        let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
290            .await
291            .unwrap();
292
293        let code = r#"
294            local pkg = require("test_pkg")
295            return pkg.value
296        "#
297        .to_string();
298
299        let result = executor
300            .eval_simple_with_paths(code, vec![extra_root])
301            .await
302            .unwrap();
303
304        // extra (2) must win over global (1)
305        assert_eq!(result, serde_json::json!(2));
306    }
307}