xs/nu/
engine.rs

1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
10
11use crate::error::Error;
12use crate::nu::commands;
13use crate::store::Store;
14
15#[derive(Clone)]
16pub struct Engine {
17    pub state: EngineState,
18}
19
20impl Engine {
21    pub fn new() -> Result<Self, Error> {
22        let mut engine_state = create_default_context();
23        engine_state = add_shell_command_context(engine_state);
24        engine_state = add_cli_context(engine_state);
25
26        let init_cwd = std::env::current_dir()?;
27        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
28
29        Ok(Self {
30            state: engine_state,
31        })
32    }
33
34    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
35        let mut working_set = StateWorkingSet::new(&self.state);
36        for command in commands {
37            working_set.add_decl(command);
38        }
39        self.state.merge_delta(working_set.render())?;
40        Ok(())
41    }
42
43    pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
44        let mut working_set = StateWorkingSet::new(&self.state);
45        let _ = parse(
46            &mut working_set,
47            None,
48            format!("alias {name} = {target}").as_bytes(),
49            false,
50        );
51        self.state.merge_delta(working_set.render())?;
52        Ok(())
53    }
54
55    pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
56        let mut working_set = StateWorkingSet::new(&self.state);
57        let block = parse(&mut working_set, None, expression.as_bytes(), false);
58
59        if !working_set.parse_errors.is_empty() {
60            let first_error = &working_set.parse_errors[0];
61            let formatted = nu_protocol::format_cli_error(&working_set, first_error, None);
62            return Err(formatted);
63        }
64
65        let mut engine_state = self.state.clone();
66        engine_state
67            .merge_delta(working_set.render())
68            .map_err(|e| {
69                let working_set = StateWorkingSet::new(&self.state);
70                nu_protocol::format_cli_error(&working_set, &e, None)
71            })?;
72
73        let mut stack = Stack::new();
74        let mut stack =
75            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
76
77        eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
78            .map(|exec_data| exec_data.body)
79            .map_err(|e| {
80                let working_set = StateWorkingSet::new(&engine_state);
81                nu_protocol::format_cli_error(&working_set, &e, None)
82            })
83    }
84
85    pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
86        let mut working_set = StateWorkingSet::new(&self.state);
87        let block = parse(&mut working_set, None, script.as_bytes(), false);
88        self.state
89            .merge_delta(working_set.render())
90            .map_err(Box::new)?;
91
92        let mut stack = Stack::new();
93        let result = eval_block_with_early_return::<WithoutDebug>(
94            &self.state,
95            &mut stack,
96            &block,
97            PipelineData::empty(),
98        )
99        .map_err(Box::new)?;
100        let closure = result
101            .body
102            .into_value(Span::unknown())
103            .map_err(Box::new)?
104            .into_closure()
105            .map_err(Box::new)?;
106
107        self.state.merge_env(&mut stack).map_err(Box::new)?;
108
109        Ok(closure)
110    }
111
112    pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
113        let mut working_set = StateWorkingSet::new(&self.state);
114
115        // Create temporary file with .nu extension that will be cleaned up when temp_dir is dropped
116        let temp_dir = tempfile::TempDir::new().map_err(|e| {
117            Box::new(ShellError::GenericError {
118                error: "I/O Error".into(),
119                msg: format!("Failed to create temporary directory for module '{name}': {e}"),
120                span: Some(Span::unknown()),
121                help: None,
122                inner: vec![],
123            })
124        })?;
125        let module_path = temp_dir.path().join(format!("{name}.nu"));
126        std::fs::write(&module_path, content).map_err(|e| {
127            Box::new(ShellError::GenericError {
128                error: "I/O Error".into(),
129                msg: e.to_string(),
130                span: Some(Span::unknown()),
131                help: None,
132                inner: vec![],
133            })
134        })?;
135
136        // Parse the use statement
137        let use_stmt = format!("use {}", module_path.display());
138        let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140        // Check for parse errors
141        if !working_set.parse_errors.is_empty() {
142            let first_error = &working_set.parse_errors[0];
143            return Err(Box::new(ShellError::GenericError {
144                error: "Parse error".into(),
145                msg: first_error.to_string(),
146                span: Some(first_error.span()),
147                help: None,
148                inner: vec![],
149            }));
150        }
151
152        // Merge changes into engine state
153        self.state
154            .merge_delta(working_set.render())
155            .map_err(Box::new)?;
156
157        // Create a temporary stack and evaluate the module
158        let mut stack = Stack::new();
159        let _ = eval_block_with_early_return::<WithoutDebug>(
160            &self.state,
161            &mut stack,
162            &_block,
163            PipelineData::empty(),
164        )
165        .map_err(Box::new)?;
166
167        // Merge environment variables into engine state
168        self.state.merge_env(&mut stack).map_err(Box::new)?;
169
170        Ok(())
171    }
172
173    pub fn with_env_vars(
174        mut self,
175        vars: impl IntoIterator<Item = (String, String)>,
176    ) -> Result<Self, Error> {
177        for (key, value) in vars {
178            self.state
179                .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
180        }
181
182        Ok(self)
183    }
184
185    pub fn run_closure_in_job(
186        &mut self,
187        closure: &nu_protocol::engine::Closure,
188        arg: Option<Value>,
189        pipeline_input: Option<PipelineData>,
190        job_name: impl Into<String>,
191    ) -> Result<PipelineData, Box<ShellError>> {
192        let job_display_name = job_name.into(); // Convert job_name early for error messages
193
194        // -- create & register job (boilerplate) ---
195        let (sender, _rx) = std::sync::mpsc::channel();
196        let job = ThreadJob::new(
197            self.state.signals().clone(),
198            Some(job_display_name.clone()),
199            sender,
200        );
201        let _job_id = {
202            let mut j = self.state.jobs.lock().unwrap();
203            j.add_job(Job::Thread(job.clone()))
204        };
205
206        // -- temporarily attach the job to self.state (boilerplate) ---
207        let saved_bg_job = self.state.current_job.background_thread_job.clone();
208        self.state.current_job.background_thread_job = Some(job.clone());
209
210        // -- prepare stack & validate/inject optional single Value argument ---
211        let block = self.state.get_block(closure.block_id);
212        let mut stack = Stack::new();
213        let mut stack =
214            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
215
216        let num_required_pos = block.signature.required_positional.len();
217        // let num_optional_pos = block.signature.optional_positional.len(); // Not checking optional for now
218
219        match arg {
220            Some(val_to_set_as_arg) => {
221                if num_required_pos == 1 {
222                    // Simplistic: assumes if an arg is given, it's for the first required one.
223                    // Could be extended for multiple args if `arg` became `Vec<Value>`.
224                    if let Some(var_id) = block.signature.required_positional[0].var_id {
225                        stack.add_var(var_id, val_to_set_as_arg);
226                    } else {
227                        // This case should ideally not happen if parsing is correct.
228                        return Err(Box::new(ShellError::GenericError{
229                            error: format!("Closure for job '{job_display_name}' expects an argument but its definition is missing a variable ID."),
230                            msg: "Internal error: argument variable ID not found.".into(),
231                            span: Some(block.span.unwrap_or_else(Span::unknown)),
232                            help: None,
233                            inner: vec![],
234                        }));
235                    }
236                } else if num_required_pos == 0 {
237                    return Err(Box::new(ShellError::GenericError{
238                        error: format!("Argument provided to job '{job_display_name}', but its run closure takes no arguments."),
239                        msg: format!("Closure signature: {name}. Provided argument type: {typ:?}", name = block.signature.name, typ = val_to_set_as_arg.get_type()),
240                        span: Some(val_to_set_as_arg.span()),
241                        help: Some("Remove the argument or modify the closure to accept one.".into()),
242                        inner: vec![],
243                    }));
244                } else {
245                    // num_required_pos > 1
246                    return Err(Box::new(ShellError::GenericError{
247                        error: format!("Single argument provided to job '{job_display_name}', but its run closure expects {num_required_pos} arguments."),
248                        msg: format!("Closure signature: {name}. Provided argument type: {typ:?}", name = block.signature.name, typ = val_to_set_as_arg.get_type()),
249                        span: Some(val_to_set_as_arg.span()),
250                        help: Some(format!("Provide {num_required_pos} arguments or modify the closure.")),
251                        inner: vec![],
252                    }));
253                }
254            }
255            None => {
256                // No explicit `arg` provided. Check if closure *requires* one.
257                if num_required_pos > 0 {
258                    // We could allow $in to fulfill the first argument if `pipeline_input` is Some Value,
259                    // but that makes the contract less clear. Stricter is better here.
260                    // If $in is supposed to be the argument, caller should convert PipelineData::Value to Option<Value> for `arg`.
261                    return Err(Box::new(ShellError::GenericError {
262                        error: format!(
263                            "Job '{job_display_name}' run closure expects {num_required_pos} argument(s), but none were provided."
264                        ),
265                        msg: format!("Closure signature: {name}", name = block.signature.name),
266                        span: Some(block.span.unwrap_or_else(Span::unknown)),
267                        help: Some(format!(
268                            "Provide {num_required_pos} argument(s) or modify the closure."
269                        )),
270                        inner: vec![],
271                    }));
272                }
273                // If num_required_pos is 0 and arg is None, this is fine.
274            }
275        }
276
277        // Determine the actual pipeline input for eval_block_with_early_return
278        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
279
280        // -- run using eval_block_with_early_return ---
281        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
282            &self.state,
283            &mut stack,
284            block,
285            eval_pipeline_input,
286        );
287
288        // -- merge env, restore job, cleanup job (boilerplate, same as before) ---
289        if eval_res.is_ok() {
290            if let Err(e) = self.state.merge_env(&mut stack) {
291                tracing::error!(
292                    "Failed to merge environment from job '{}': {}",
293                    job_display_name,
294                    e
295                );
296            }
297        }
298
299        self.state.current_job.background_thread_job = saved_bg_job;
300        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
301    }
302
303    /// Kill the background ThreadJob whose name equals `name`.
304    pub fn kill_job_by_name(&self, name: &str) {
305        if let Ok(mut jobs) = self.state.jobs.lock() {
306            let job_id = {
307                jobs.iter().find_map(|(jid, job)| {
308                    job.tag()
309                        .and_then(|tag| if tag == name { Some(jid) } else { None })
310                })
311            };
312            if let Some(job_id) = job_id {
313                let _ = jobs.kill_and_remove(job_id);
314            }
315        }
316    }
317}
318
319/// Add core cross.stream commands that are common across all Nushell pipeline runners
320pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
321    engine.add_commands(vec![
322        Box::new(commands::cas_command::CasCommand::new(store.clone())),
323        Box::new(commands::get_command::GetCommand::new(store.clone())),
324        Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
325        Box::new(commands::scru128_command::Scru128Command::new()),
326    ])
327}