Skip to main content

xs/nu/
engine.rs

1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::shell_error::generic::GenericError;
10use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
11
12use std::sync::{Arc, Mutex};
13
14use serde_json::Value as JsonValue;
15
16use crate::error::Error;
17use crate::nu::commands;
18use crate::store::{Frame, Store};
19
20#[derive(Clone)]
21pub struct Engine {
22    pub state: EngineState,
23}
24
25impl Engine {
26    pub fn new() -> Result<Self, Error> {
27        let mut engine_state = create_default_context();
28        engine_state = add_shell_command_context(engine_state);
29        engine_state = add_cli_context(engine_state);
30        nu_std::load_standard_library(&mut engine_state)?;
31
32        let init_cwd = std::env::current_dir()?;
33        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
34
35        Ok(Self {
36            state: engine_state,
37        })
38    }
39
40    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
41        let mut working_set = StateWorkingSet::new(&self.state);
42        for command in commands {
43            working_set.add_decl(command);
44        }
45        self.state.merge_delta(working_set.render())?;
46        Ok(())
47    }
48
49    pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
50        let mut working_set = StateWorkingSet::new(&self.state);
51        let _ = parse(
52            &mut working_set,
53            None,
54            format!("alias {name} = {target}").as_bytes(),
55            false,
56        );
57        self.state.merge_delta(working_set.render())?;
58        Ok(())
59    }
60
61    pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
62        let mut working_set = StateWorkingSet::new(&self.state);
63        let block = parse(&mut working_set, None, expression.as_bytes(), false);
64
65        if !working_set.parse_errors.is_empty() {
66            let first_error = &working_set.parse_errors[0];
67            let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
68            return Err(formatted);
69        }
70
71        let mut engine_state = self.state.clone();
72        engine_state
73            .merge_delta(working_set.render())
74            .map_err(|e| {
75                let working_set = StateWorkingSet::new(&self.state);
76                nu_protocol::format_cli_error(None, &working_set, &e, None)
77            })?;
78
79        let mut stack = Stack::new();
80        let mut stack =
81            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
82
83        eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
84            .map(|exec_data| exec_data.body)
85            .map_err(|e| {
86                let working_set = StateWorkingSet::new(&engine_state);
87                nu_protocol::format_cli_error(None, &working_set, &e, None)
88            })
89    }
90
91    pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
92        let mut working_set = StateWorkingSet::new(&self.state);
93        let block = parse(&mut working_set, None, script.as_bytes(), false);
94        self.state
95            .merge_delta(working_set.render())
96            .map_err(Box::new)?;
97
98        let mut stack = Stack::new();
99        let result = eval_block_with_early_return::<WithoutDebug>(
100            &self.state,
101            &mut stack,
102            &block,
103            PipelineData::empty(),
104        )
105        .map_err(Box::new)?;
106        let closure = result
107            .body
108            .into_value(Span::unknown())
109            .map_err(Box::new)?
110            .into_closure()
111            .map_err(Box::new)?;
112
113        self.state.merge_env(&mut stack).map_err(Box::new)?;
114
115        Ok(closure)
116    }
117
118    pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
119        let mut working_set = StateWorkingSet::new(&self.state);
120
121        // Create temporary file with .nu extension that will be cleaned up when temp_dir is dropped
122        let temp_dir = tempfile::TempDir::new().map_err(|e| {
123            Box::new(ShellError::Generic(GenericError::new_internal(
124                "I/O Error",
125                format!("Failed to create temporary directory for module '{name}': {e}"),
126            )))
127        })?;
128        let module_path = temp_dir.path().join(format!("{name}.nu"));
129        std::fs::write(&module_path, content).map_err(|e| {
130            Box::new(ShellError::Generic(GenericError::new_internal(
131                "I/O Error",
132                e.to_string(),
133            )))
134        })?;
135
136        // Parse the use statement
137        let use_stmt = format!("use {}", module_path.display());
138        let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140        // Check for parse errors
141        if !working_set.parse_errors.is_empty() {
142            let first_error = &working_set.parse_errors[0];
143            return Err(Box::new(ShellError::Generic(GenericError::new(
144                "Parse error",
145                first_error.to_string(),
146                first_error.span(),
147            ))));
148        }
149
150        // Merge changes into engine state
151        self.state
152            .merge_delta(working_set.render())
153            .map_err(Box::new)?;
154
155        // Create a temporary stack and evaluate the module
156        let mut stack = Stack::new();
157        let _ = eval_block_with_early_return::<WithoutDebug>(
158            &self.state,
159            &mut stack,
160            &_block,
161            PipelineData::empty(),
162        )
163        .map_err(Box::new)?;
164
165        // Merge environment variables into engine state
166        self.state.merge_env(&mut stack).map_err(Box::new)?;
167
168        Ok(())
169    }
170
171    pub fn with_env_vars(
172        mut self,
173        vars: impl IntoIterator<Item = (String, String)>,
174    ) -> Result<Self, Error> {
175        for (key, value) in vars {
176            self.state
177                .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
178        }
179
180        Ok(self)
181    }
182
183    /// Set the base metadata stamped on frames this engine's `.append` writes
184    /// (`service_id`, `{action_id, frame_id}`, ...). Injected as `$env` so the
185    /// `.append` decl stays instance-independent. See `append_command`.
186    pub fn set_append_meta(&mut self, meta: &JsonValue) {
187        self.state.add_env_var(
188            crate::nu::commands::append_command::APPEND_META_ENV.to_string(),
189            Value::string(meta.to_string(), Span::unknown()),
190        );
191    }
192
193    pub fn run_closure_in_job(
194        &mut self,
195        closure: &nu_protocol::engine::Closure,
196        args: Vec<Value>,
197        pipeline_input: Option<PipelineData>,
198        job_name: impl Into<String>,
199    ) -> Result<PipelineData, Box<ShellError>> {
200        let job_display_name = job_name.into(); // Convert job_name early for error messages
201
202        // -- create & register job (boilerplate) ---
203        let (sender, _rx) = std::sync::mpsc::channel();
204        let job = ThreadJob::new(
205            self.state.signals().clone(),
206            Some(job_display_name.clone()),
207            sender,
208        );
209        let _job_id = {
210            let mut j = self.state.jobs.lock().unwrap();
211            j.add_job(Job::Thread(job.clone()))
212        };
213
214        // -- temporarily attach the job to self.state (boilerplate) ---
215        let saved_bg_job = self.state.current_job.background_thread_job.clone();
216        self.state.current_job.background_thread_job = Some(job.clone());
217
218        // -- prepare stack & validate/inject positional arguments ---
219        let block = self.state.get_block(closure.block_id);
220        let mut stack = Stack::new();
221        let mut stack =
222            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
223
224        let num_required = block.signature.required_positional.len();
225        let num_optional = block.signature.optional_positional.len();
226        let total_positional = num_required + num_optional;
227
228        if args.len() > total_positional {
229            return Err(Box::new(ShellError::Generic(GenericError::new(
230                format!(
231                    "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
232                    args.len()
233                ),
234                format!("Closure signature: {name}", name = block.signature.name),
235                block.span.unwrap_or_else(Span::unknown),
236            ))));
237        }
238
239        if args.len() < num_required {
240            return Err(Box::new(ShellError::Generic(GenericError::new(
241                format!(
242                    "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
243                    args.len()
244                ),
245                format!("Closure signature: {name}", name = block.signature.name),
246                block.span.unwrap_or_else(Span::unknown),
247            ))));
248        }
249
250        // Inject provided positional args
251        for (i, val) in args.iter().enumerate() {
252            let param = if i < num_required {
253                &block.signature.required_positional[i]
254            } else {
255                &block.signature.optional_positional[i - num_required]
256            };
257            if let Some(var_id) = param.var_id {
258                stack.add_var(var_id, val.clone());
259            }
260        }
261
262        // Set default values for optional params not covered by provided args
263        let optional_covered = args.len().saturating_sub(num_required);
264        for i in optional_covered..num_optional {
265            let param = &block.signature.optional_positional[i];
266            if let Some(var_id) = param.var_id {
267                let default = param
268                    .default_value
269                    .clone()
270                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
271                stack.add_var(var_id, default);
272            }
273        }
274
275        // Determine the actual pipeline input for eval_block_with_early_return
276        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
277
278        // -- run using eval_block_with_early_return ---
279        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
280            &self.state,
281            &mut stack,
282            block,
283            eval_pipeline_input,
284        );
285
286        // -- merge env, restore job, cleanup job (boilerplate, same as before) ---
287        if eval_res.is_ok() {
288            if let Err(e) = self.state.merge_env(&mut stack) {
289                tracing::error!(
290                    "Failed to merge environment from job '{}': {}",
291                    job_display_name,
292                    e
293                );
294            }
295        }
296
297        self.state.current_job.background_thread_job = saved_bg_job;
298        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
299    }
300
301    /// Evaluate a closure WITHOUT creating and registering a fresh
302    /// `ThreadJob` per call. `run_closure_in_job` allocates an mpsc channel,
303    /// builds a `ThreadJob`, locks `self.state.jobs` and `add_job`s it -- and
304    /// never removes it -- on every invocation. In a hot per-frame actor loop
305    /// that churn (plus the unbounded jobs-table growth) dominated the cost.
306    /// The caller is expected to have attached a single long-lived background
307    /// job to `self.state` once (see the actor `EngineWorker`); this method
308    /// just sets up the stack, injects positional args, and evaluates.
309    pub fn eval_closure_no_job(
310        &mut self,
311        closure: &nu_protocol::engine::Closure,
312        args: Vec<Value>,
313        pipeline_input: Option<PipelineData>,
314    ) -> Result<PipelineData, Box<ShellError>> {
315        let block = self.state.get_block(closure.block_id);
316        let mut stack = Stack::new();
317        let mut stack =
318            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
319
320        let num_required = block.signature.required_positional.len();
321        let num_optional = block.signature.optional_positional.len();
322        let total_positional = num_required + num_optional;
323
324        if args.len() > total_positional {
325            return Err(Box::new(ShellError::Generic(GenericError::new(
326                format!(
327                    "Too many arguments for actor closure: got {}, closure accepts at most {total_positional}.",
328                    args.len()
329                ),
330                format!("Closure signature: {name}", name = block.signature.name),
331                block.span.unwrap_or_else(Span::unknown),
332            ))));
333        }
334
335        if args.len() < num_required {
336            return Err(Box::new(ShellError::Generic(GenericError::new(
337                format!(
338                    "Actor closure expects {num_required} required argument(s), but {} were provided.",
339                    args.len()
340                ),
341                format!("Closure signature: {name}", name = block.signature.name),
342                block.span.unwrap_or_else(Span::unknown),
343            ))));
344        }
345
346        for (i, val) in args.iter().enumerate() {
347            let param = if i < num_required {
348                &block.signature.required_positional[i]
349            } else {
350                &block.signature.optional_positional[i - num_required]
351            };
352            if let Some(var_id) = param.var_id {
353                stack.add_var(var_id, val.clone());
354            }
355        }
356
357        let optional_covered = args.len().saturating_sub(num_required);
358        for i in optional_covered..num_optional {
359            let param = &block.signature.optional_positional[i];
360            if let Some(var_id) = param.var_id {
361                let default = param
362                    .default_value
363                    .clone()
364                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
365                stack.add_var(var_id, default);
366            }
367        }
368
369        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
370        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
371            &self.state,
372            &mut stack,
373            block,
374            eval_pipeline_input,
375        );
376
377        if eval_res.is_ok() {
378            if let Err(e) = self.state.merge_env(&mut stack) {
379                tracing::error!("Failed to merge environment from actor closure: {}", e);
380            }
381        }
382
383        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
384    }
385
386    /// Attach a single long-lived background `ThreadJob` to this engine's
387    /// state. Call once before a hot eval loop so `eval_closure_no_job` can
388    /// skip per-call job creation. Signals still propagate (the job shares
389    /// `self.state.signals()`).
390    pub fn attach_background_job(&mut self, name: impl Into<String>) {
391        let (sender, _rx) = std::sync::mpsc::channel();
392        let job = ThreadJob::new(self.state.signals().clone(), Some(name.into()), sender);
393        {
394            let mut j = self.state.jobs.lock().unwrap();
395            j.add_job(Job::Thread(job.clone()));
396        }
397        self.state.current_job.background_thread_job = Some(job);
398    }
399
400    /// Kill the background ThreadJob whose name equals `name`.
401    pub fn kill_job_by_name(&self, name: &str) {
402        if let Ok(mut jobs) = self.state.jobs.lock() {
403            let job_id = {
404                jobs.iter().find_map(|(jid, job)| {
405                    job.description()
406                        .and_then(|desc| if desc == name { Some(jid) } else { None })
407                })
408            };
409            if let Some(job_id) = job_id {
410                let _ = jobs.kill_and_remove(job_id);
411            }
412        }
413    }
414}
415
416/// Add core cross.stream commands that are common across all Nushell pipeline runners
417pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
418    engine.add_commands(vec![
419        Box::new(commands::cas_command::CasCommand::new(store.clone())),
420        Box::new(commands::get_command::GetCommand::new(store.clone())),
421        Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
422        Box::new(commands::scru128_command::Scru128Command::new()),
423    ])
424}
425
426/// Which `.cat`/`.last` flavor a pipeline runner exposes.
427pub enum ReadMode {
428    /// Streaming readers that support `--follow` (eval, actions, services).
429    Stream,
430    /// Collected readers without follow (actors).
431    Plain,
432}
433
434/// How `.append` behaves. This is the one store command whose behaviour
435/// genuinely varies by runner.
436pub enum AppendMode {
437    /// Write straight to the store. The per-instance base metadata is injected
438    /// via the `XS_APPEND_META` env var (see `append_command`), not baked in.
439    Direct,
440    /// Buffer appended frames into `output` for the caller to flush (actors).
441    Buffered(Arc<Mutex<Vec<Frame>>>),
442}
443
444/// Register the `.cat` and `.last` read builtins for a pipeline runner.
445pub fn add_read_commands(engine: &mut Engine, store: &Store, mode: ReadMode) -> Result<(), Error> {
446    match mode {
447        ReadMode::Stream => engine.add_commands(vec![
448            Box::new(commands::cat_stream_command::CatStreamCommand::new(
449                store.clone(),
450            )),
451            Box::new(commands::last_stream_command::LastStreamCommand::new(
452                store.clone(),
453            )),
454        ]),
455        ReadMode::Plain => engine.add_commands(vec![
456            Box::new(commands::cat_command::CatCommand::new(store.clone())),
457            Box::new(commands::last_command::LastCommand::new(store.clone())),
458        ]),
459    }
460}
461
462/// Register the write builtins for a pipeline runner: `.append` (per `mode`)
463/// plus `.import` and `.cas-post`, which are identical across runners.
464pub fn add_write_commands(
465    engine: &mut Engine,
466    store: &Store,
467    mode: AppendMode,
468) -> Result<(), Error> {
469    engine.add_commands(vec![
470        Box::new(commands::import_command::ImportCommand::new(store.clone())),
471        Box::new(commands::cas_post_command::CasPostCommand::new(
472            store.clone(),
473        )),
474    ])?;
475    match mode {
476        AppendMode::Direct => engine.add_commands(vec![Box::new(
477            commands::append_command::AppendCommand::new(store.clone()),
478        )]),
479        AppendMode::Buffered(output) => engine.add_commands(vec![Box::new(
480            commands::append_command_buffered::AppendCommand::new(store.clone(), output),
481        )]),
482    }
483}
484
485/// The module-free, instance-free base engine for a runner: nushell + stdlib +
486/// the core builtins + the `.rm` alias + the read builtins, plus the `.append`
487/// write builtin for direct writers. Build this once per runner and `clone()` it per
488/// spawn or restart; `load_modules(as_of)` and `set_append_meta(..)` specialize
489/// each clone. Actors pass `direct_write: false` and add their per-instance
490/// buffered `.append` to the clone.
491pub fn prepared_base(store: &Store, read: ReadMode, direct_write: bool) -> Result<Engine, Error> {
492    // Clone the embedder's base engine when the store carries one, else build
493    // the default. See ADR 0007.
494    let mut engine = match store.base_engine() {
495        Some(base) => Engine {
496            state: base.clone(),
497        },
498        None => Engine::new()?,
499    };
500    add_core_commands(&mut engine, store)?;
501    engine.add_alias(".rm", ".remove")?;
502    add_read_commands(&mut engine, store, read)?;
503    if direct_write {
504        add_write_commands(&mut engine, store, AppendMode::Direct)?;
505    }
506    Ok(engine)
507}