1use std::collections::HashMap;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use algocline_core::{Budget, ExecutionMetrics, ExecutionSpec};
25use mlua::LuaSerdeExt;
26use mlua_isle::{AsyncIsle, AsyncIsleDriver, IsleError};
27use mlua_pkg::Registry;
28
29use crate::bridge;
30use crate::card::FileCardStore;
31use crate::llm_bridge::LlmRequest;
32use crate::resolver_factory::make_resolver;
33use crate::session::Session;
34use crate::state::JsonFileStore;
35use crate::variant_pkg::{register_variant_pkgs, VariantPkg};
36
37const PRELUDE: &str = include_str!("prelude.lua");
40
41pub struct Executor {
50 isle: AsyncIsle,
52 _driver: AsyncIsleDriver,
53 lib_paths: Vec<PathBuf>,
55}
56
57impl Executor {
58 pub async fn new(lib_paths: Vec<PathBuf>) -> anyhow::Result<Self> {
59 let paths_for_shared = lib_paths.clone();
60 let (isle, driver) = AsyncIsle::spawn(move |lua| {
61 let mut reg = Registry::new();
62 for path in &paths_for_shared {
63 if let Some(resolver) = make_resolver(path) {
64 reg.add(resolver);
65 }
66 }
67 reg.install(lua)?;
68 Ok(())
69 })
70 .await?;
71
72 Ok(Self {
73 isle,
74 _driver: driver,
75 lib_paths,
76 })
77 }
78
79 pub async fn eval_simple(&self, code: String) -> Result<serde_json::Value, String> {
85 self.eval_simple_with_paths(code, vec![], vec![]).await
86 }
87
88 pub async fn eval_simple_with_paths(
101 &self,
102 code: String,
103 extra_lib_paths: Vec<PathBuf>,
104 variant_pkgs: Vec<VariantPkg>,
105 ) -> Result<serde_json::Value, String> {
106 if extra_lib_paths.is_empty() && variant_pkgs.is_empty() {
107 let task = self.isle.spawn_exec(move |lua| {
109 let result: mlua::Value = lua
110 .load(&code)
111 .eval()
112 .map_err(|e| IsleError::Lua(e.to_string()))?;
113 let json: serde_json::Value = lua
114 .from_value(result)
115 .map_err(|e| IsleError::Lua(e.to_string()))?;
116 serde_json::to_string(&json)
117 .map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
118 });
119 let json_str = task.await.map_err(|e| e.to_string())?;
120 return serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"));
121 }
122
123 let mut effective = extra_lib_paths;
125 effective.extend(self.lib_paths.iter().cloned());
126
127 let (tmp_isle, _tmp_driver) = AsyncIsle::spawn(move |lua| {
128 let mut reg = Registry::new();
129 register_variant_pkgs(&mut reg, &variant_pkgs);
131 for path in &effective {
132 if let Some(resolver) = make_resolver(path) {
133 reg.add(resolver);
134 }
135 }
136 reg.install(lua)?;
137 Ok(())
138 })
139 .await
140 .map_err(|e| format!("eval_simple VM spawn failed: {e}"))?;
141
142 let task = tmp_isle.spawn_exec(move |lua| {
143 let result: mlua::Value = lua
144 .load(&code)
145 .eval()
146 .map_err(|e| IsleError::Lua(e.to_string()))?;
147 let json: serde_json::Value = lua
148 .from_value(result)
149 .map_err(|e| IsleError::Lua(e.to_string()))?;
150 serde_json::to_string(&json).map_err(|e| IsleError::Lua(format!("JSON serialize: {e}")))
151 });
152
153 let json_str = task.await.map_err(|e| e.to_string())?;
154 serde_json::from_str(&json_str).map_err(|e| format!("JSON parse: {e}"))
155 }
156
157 #[allow(clippy::too_many_arguments)]
174 pub async fn start_session(
175 &self,
176 code: String,
177 ctx: serde_json::Value,
178 extra_lib_paths: Vec<PathBuf>,
179 variant_pkgs: Vec<VariantPkg>,
180 state_store: Arc<JsonFileStore>,
181 card_store: Arc<FileCardStore>,
182 scenarios_dir: PathBuf,
183 ) -> Result<Session, String> {
184 let spec = ExecutionSpec::new(code, ctx);
185 let metrics = ExecutionMetrics::new();
186
187 if let Some(budget) = Budget::from_ctx(&spec.ctx) {
189 metrics.set_budget(budget);
190 }
191
192 let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
193
194 let mut effective = extra_lib_paths;
198 effective.extend(self.lib_paths.iter().cloned());
199
200 let log_sink = metrics.log_sink_handle();
203
204 let bridge_config = bridge::BridgeConfig {
205 llm_tx: Some(llm_tx),
206 ns: spec.namespace.clone(),
207 custom_metrics: metrics.custom_metrics_handle(),
208 stats: metrics.stats_handle(),
209 budget: metrics.budget_handle(),
210 progress: metrics.progress_handle(),
211 lib_paths: effective.clone(), variant_pkgs: variant_pkgs.clone(), state_store,
214 card_store,
215 scenarios_dir,
216 log_sink: Some(log_sink.clone()),
217 };
218 let lua_ctx = spec.ctx.clone();
219 let lua_code = spec.code.clone();
220
221 let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
223 let mut reg = Registry::new();
224 register_variant_pkgs(&mut reg, &variant_pkgs);
226 for path in &effective {
227 if let Some(resolver) = make_resolver(path) {
228 reg.add(resolver);
229 }
230 }
231 reg.install(lua)?;
232 Ok(())
233 })
234 .await
235 .map_err(|e| format!("Session VM spawn failed: {e}"))?;
236
237 session_isle
240 .exec(move |lua| {
241 let alc_table = lua.create_table()?;
242 bridge::register(lua, &alc_table, bridge_config)?;
243 lua.globals().set("alc", alc_table)?;
244
245 let ctx_value = lua.to_value(&lua_ctx)?;
246 lua.globals().set("ctx", ctx_value)?;
247
248 lua.load(PRELUDE)
249 .exec()
250 .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
251
252 Ok("ok".to_string())
264 })
265 .await
266 .map_err(|e| format!("Session setup failed: {e}"))?;
267
268 let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
270 let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
271
272 drop(session_isle);
275
276 Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
277 }
278
279 #[allow(clippy::too_many_arguments)]
290 pub async fn start_session_with_env(
291 &self,
292 env_map: Arc<HashMap<String, String>>,
293 code: String,
294 ctx: serde_json::Value,
295 extra_lib_paths: Vec<PathBuf>,
296 variant_pkgs: Vec<VariantPkg>,
297 state_store: Arc<JsonFileStore>,
298 card_store: Arc<FileCardStore>,
299 scenarios_dir: PathBuf,
300 ) -> Result<Session, String> {
301 let spec = ExecutionSpec::new(code, ctx);
302 let metrics = ExecutionMetrics::new();
303
304 if let Some(budget) = Budget::from_ctx(&spec.ctx) {
305 metrics.set_budget(budget);
306 }
307
308 let (llm_tx, llm_rx) = tokio::sync::mpsc::channel::<LlmRequest>(16);
309
310 let mut effective = extra_lib_paths;
311 effective.extend(self.lib_paths.iter().cloned());
312
313 let log_sink = metrics.log_sink_handle();
314
315 let bridge_config = bridge::BridgeConfig {
316 llm_tx: Some(llm_tx),
317 ns: spec.namespace.clone(),
318 custom_metrics: metrics.custom_metrics_handle(),
319 stats: metrics.stats_handle(),
320 budget: metrics.budget_handle(),
321 progress: metrics.progress_handle(),
322 lib_paths: effective.clone(),
323 variant_pkgs: variant_pkgs.clone(),
324 state_store,
325 card_store,
326 scenarios_dir,
327 log_sink: Some(log_sink.clone()),
328 };
329 let lua_ctx = spec.ctx.clone();
330 let lua_code = spec.code.clone();
331
332 let (session_isle, session_driver) = AsyncIsle::spawn(move |lua| {
333 let mut reg = Registry::new();
334 register_variant_pkgs(&mut reg, &variant_pkgs);
335 for path in &effective {
336 if let Some(resolver) = make_resolver(path) {
337 reg.add(resolver);
338 }
339 }
340 reg.install(lua)?;
341 Ok(())
342 })
343 .await
344 .map_err(|e| format!("Session VM spawn failed: {e}"))?;
345
346 session_isle
347 .exec(move |lua| {
348 let alc_table = lua.create_table()?;
349 bridge::register(lua, &alc_table, bridge_config)?;
350 bridge::register_env(lua, &alc_table, env_map)
353 .map_err(|e| IsleError::Lua(e.to_string()))?;
354 lua.globals().set("alc", alc_table)?;
355
356 let ctx_value = lua.to_value(&lua_ctx)?;
357 lua.globals().set("ctx", ctx_value)?;
358
359 lua.load(PRELUDE)
360 .exec()
361 .map_err(|e| IsleError::Lua(format!("Prelude load failed: {e}")))?;
362
363 Ok("ok".to_string())
364 })
365 .await
366 .map_err(|e| format!("Session setup failed: {e}"))?;
367
368 let wrapped_code = format!("return alc.json_encode((function()\n{lua_code}\nend)())");
369 let exec_task = session_isle.spawn_coroutine_eval(&wrapped_code);
370
371 drop(session_isle);
372
373 Ok(Session::new(llm_rx, exec_task, metrics, session_driver))
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use std::fs;
381
382 fn make_pkg_dir(parent: &std::path::Path, pkg_name: &str, init_lua: &str) -> PathBuf {
384 let pkg_dir = parent.join(pkg_name);
385 fs::create_dir_all(&pkg_dir).unwrap();
386 fs::write(pkg_dir.join("init.lua"), init_lua).unwrap();
387 parent.to_path_buf()
388 }
389
390 #[tokio::test]
392 async fn no_extra_lib_paths_eval_simple() {
393 let executor = Executor::new(vec![]).await.unwrap();
394 let result = executor.eval_simple("return 42".to_string()).await.unwrap();
395 assert_eq!(result, serde_json::json!(42));
396 }
397
398 #[tokio::test]
403 async fn extra_lib_paths_reachable_via_eval_simple_with_paths() {
404 let tmp = tempfile::tempdir().unwrap();
405 let pkg_root = make_pkg_dir(tmp.path(), "test_pkg", "return { value = 99 }");
406
407 let executor = Executor::new(vec![]).await.unwrap();
408 let code = r#"
409 local pkg = require("test_pkg")
410 return pkg.value
411 "#
412 .to_string();
413
414 let result = executor
415 .eval_simple_with_paths(code, vec![pkg_root], vec![])
416 .await
417 .unwrap();
418
419 assert_eq!(result, serde_json::json!(99));
420 }
421
422 #[tokio::test]
425 async fn variant_pkg_resolves_root_and_submodule() {
426 let tmp = tempfile::tempdir().unwrap();
427 let pkg_dir = tmp.path().join("physical-dir");
430 fs::create_dir_all(&pkg_dir).unwrap();
431 fs::write(
432 pkg_dir.join("init.lua"),
433 "return { greet = function(n) return 'hi-' .. n end, sub = require('logical_name.sub') }",
434 )
435 .unwrap();
436 fs::write(pkg_dir.join("sub.lua"), "return { value = 7 }").unwrap();
437
438 let executor = Executor::new(vec![]).await.unwrap();
439 let code = r#"
440 local pkg = require("logical_name")
441 return { msg = pkg.greet("there"), sub_value = pkg.sub.value }
442 "#
443 .to_string();
444
445 let result = executor
446 .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("logical_name", pkg_dir)])
447 .await
448 .unwrap();
449
450 assert_eq!(result["msg"], serde_json::json!("hi-there"));
451 assert_eq!(result["sub_value"], serde_json::json!(7));
452 }
453
454 #[tokio::test]
456 async fn variant_pkg_overrides_global_same_name() {
457 let global_tmp = tempfile::tempdir().unwrap();
458 let variant_tmp = tempfile::tempdir().unwrap();
459
460 make_pkg_dir(global_tmp.path(), "my_pkg", "return { value = 1 }");
462 let variant_dir = variant_tmp.path().join("my_pkg");
464 fs::create_dir_all(&variant_dir).unwrap();
465 fs::write(variant_dir.join("init.lua"), "return { value = 2 }").unwrap();
466
467 let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
468 .await
469 .unwrap();
470
471 let code = r#"
472 local pkg = require("my_pkg")
473 return pkg.value
474 "#
475 .to_string();
476
477 let result = executor
478 .eval_simple_with_paths(code, vec![], vec![VariantPkg::new("my_pkg", variant_dir)])
479 .await
480 .unwrap();
481
482 assert_eq!(result, serde_json::json!(2));
483 }
484
485 #[tokio::test]
488 async fn extra_lib_paths_priority_over_default() {
489 let global_tmp = tempfile::tempdir().unwrap();
490 let extra_tmp = tempfile::tempdir().unwrap();
491
492 make_pkg_dir(global_tmp.path(), "test_pkg", "return { value = 1 }");
494 let extra_root = make_pkg_dir(extra_tmp.path(), "test_pkg", "return { value = 2 }");
496
497 let executor = Executor::new(vec![global_tmp.path().to_path_buf()])
499 .await
500 .unwrap();
501
502 let code = r#"
503 local pkg = require("test_pkg")
504 return pkg.value
505 "#
506 .to_string();
507
508 let result = executor
509 .eval_simple_with_paths(code, vec![extra_root], vec![])
510 .await
511 .unwrap();
512
513 assert_eq!(result, serde_json::json!(2));
515 }
516}