Skip to main content

xs/nu/
engine.rs

1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
10
11use crate::error::Error;
12use crate::nu::commands;
13use crate::store::Store;
14
15#[derive(Clone)]
16pub struct Engine {
17    pub state: EngineState,
18}
19
20impl Engine {
21    pub fn new() -> Result<Self, Error> {
22        let mut engine_state = create_default_context();
23        engine_state = add_shell_command_context(engine_state);
24        engine_state = add_cli_context(engine_state);
25
26        let init_cwd = std::env::current_dir()?;
27        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
28
29        Ok(Self {
30            state: engine_state,
31        })
32    }
33
34    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
35        let mut working_set = StateWorkingSet::new(&self.state);
36        for command in commands {
37            working_set.add_decl(command);
38        }
39        self.state.merge_delta(working_set.render())?;
40        Ok(())
41    }
42
43    pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
44        let mut working_set = StateWorkingSet::new(&self.state);
45        let _ = parse(
46            &mut working_set,
47            None,
48            format!("alias {name} = {target}").as_bytes(),
49            false,
50        );
51        self.state.merge_delta(working_set.render())?;
52        Ok(())
53    }
54
55    pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
56        let mut working_set = StateWorkingSet::new(&self.state);
57        let block = parse(&mut working_set, None, expression.as_bytes(), false);
58
59        if !working_set.parse_errors.is_empty() {
60            let first_error = &working_set.parse_errors[0];
61            let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
62            return Err(formatted);
63        }
64
65        let mut engine_state = self.state.clone();
66        engine_state
67            .merge_delta(working_set.render())
68            .map_err(|e| {
69                let working_set = StateWorkingSet::new(&self.state);
70                nu_protocol::format_cli_error(None, &working_set, &e, None)
71            })?;
72
73        let mut stack = Stack::new();
74        let mut stack =
75            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
76
77        eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
78            .map(|exec_data| exec_data.body)
79            .map_err(|e| {
80                let working_set = StateWorkingSet::new(&engine_state);
81                nu_protocol::format_cli_error(None, &working_set, &e, None)
82            })
83    }
84
85    pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
86        let mut working_set = StateWorkingSet::new(&self.state);
87        let block = parse(&mut working_set, None, script.as_bytes(), false);
88        self.state
89            .merge_delta(working_set.render())
90            .map_err(Box::new)?;
91
92        let mut stack = Stack::new();
93        let result = eval_block_with_early_return::<WithoutDebug>(
94            &self.state,
95            &mut stack,
96            &block,
97            PipelineData::empty(),
98        )
99        .map_err(Box::new)?;
100        let closure = result
101            .body
102            .into_value(Span::unknown())
103            .map_err(Box::new)?
104            .into_closure()
105            .map_err(Box::new)?;
106
107        self.state.merge_env(&mut stack).map_err(Box::new)?;
108
109        Ok(closure)
110    }
111
112    pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
113        let mut working_set = StateWorkingSet::new(&self.state);
114
115        // Create temporary file with .nu extension that will be cleaned up when temp_dir is dropped
116        let temp_dir = tempfile::TempDir::new().map_err(|e| {
117            Box::new(ShellError::GenericError {
118                error: "I/O Error".into(),
119                msg: format!("Failed to create temporary directory for module '{name}': {e}"),
120                span: Some(Span::unknown()),
121                help: None,
122                inner: vec![],
123            })
124        })?;
125        let module_path = temp_dir.path().join(format!("{name}.nu"));
126        std::fs::write(&module_path, content).map_err(|e| {
127            Box::new(ShellError::GenericError {
128                error: "I/O Error".into(),
129                msg: e.to_string(),
130                span: Some(Span::unknown()),
131                help: None,
132                inner: vec![],
133            })
134        })?;
135
136        // Parse the use statement
137        let use_stmt = format!("use {}", module_path.display());
138        let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140        // Check for parse errors
141        if !working_set.parse_errors.is_empty() {
142            let first_error = &working_set.parse_errors[0];
143            return Err(Box::new(ShellError::GenericError {
144                error: "Parse error".into(),
145                msg: first_error.to_string(),
146                span: Some(first_error.span()),
147                help: None,
148                inner: vec![],
149            }));
150        }
151
152        // Merge changes into engine state
153        self.state
154            .merge_delta(working_set.render())
155            .map_err(Box::new)?;
156
157        // Create a temporary stack and evaluate the module
158        let mut stack = Stack::new();
159        let _ = eval_block_with_early_return::<WithoutDebug>(
160            &self.state,
161            &mut stack,
162            &_block,
163            PipelineData::empty(),
164        )
165        .map_err(Box::new)?;
166
167        // Merge environment variables into engine state
168        self.state.merge_env(&mut stack).map_err(Box::new)?;
169
170        Ok(())
171    }
172
173    pub fn with_env_vars(
174        mut self,
175        vars: impl IntoIterator<Item = (String, String)>,
176    ) -> Result<Self, Error> {
177        for (key, value) in vars {
178            self.state
179                .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
180        }
181
182        Ok(self)
183    }
184
185    pub fn run_closure_in_job(
186        &mut self,
187        closure: &nu_protocol::engine::Closure,
188        args: Vec<Value>,
189        pipeline_input: Option<PipelineData>,
190        job_name: impl Into<String>,
191    ) -> Result<PipelineData, Box<ShellError>> {
192        let job_display_name = job_name.into(); // Convert job_name early for error messages
193
194        // -- create & register job (boilerplate) ---
195        let (sender, _rx) = std::sync::mpsc::channel();
196        let job = ThreadJob::new(
197            self.state.signals().clone(),
198            Some(job_display_name.clone()),
199            sender,
200        );
201        let _job_id = {
202            let mut j = self.state.jobs.lock().unwrap();
203            j.add_job(Job::Thread(job.clone()))
204        };
205
206        // -- temporarily attach the job to self.state (boilerplate) ---
207        let saved_bg_job = self.state.current_job.background_thread_job.clone();
208        self.state.current_job.background_thread_job = Some(job.clone());
209
210        // -- prepare stack & validate/inject positional arguments ---
211        let block = self.state.get_block(closure.block_id);
212        let mut stack = Stack::new();
213        let mut stack =
214            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
215
216        let num_required = block.signature.required_positional.len();
217        let num_optional = block.signature.optional_positional.len();
218        let total_positional = num_required + num_optional;
219
220        if args.len() > total_positional {
221            return Err(Box::new(ShellError::GenericError {
222                error: format!(
223                    "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
224                    args.len()
225                ),
226                msg: format!("Closure signature: {name}", name = block.signature.name),
227                span: Some(block.span.unwrap_or_else(Span::unknown)),
228                help: None,
229                inner: vec![],
230            }));
231        }
232
233        if args.len() < num_required {
234            return Err(Box::new(ShellError::GenericError {
235                error: format!(
236                    "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
237                    args.len()
238                ),
239                msg: format!("Closure signature: {name}", name = block.signature.name),
240                span: Some(block.span.unwrap_or_else(Span::unknown)),
241                help: None,
242                inner: vec![],
243            }));
244        }
245
246        // Inject provided positional args
247        for (i, val) in args.iter().enumerate() {
248            let param = if i < num_required {
249                &block.signature.required_positional[i]
250            } else {
251                &block.signature.optional_positional[i - num_required]
252            };
253            if let Some(var_id) = param.var_id {
254                stack.add_var(var_id, val.clone());
255            }
256        }
257
258        // Set default values for optional params not covered by provided args
259        let optional_covered = args.len().saturating_sub(num_required);
260        for i in optional_covered..num_optional {
261            let param = &block.signature.optional_positional[i];
262            if let Some(var_id) = param.var_id {
263                let default = param
264                    .default_value
265                    .clone()
266                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
267                stack.add_var(var_id, default);
268            }
269        }
270
271        // Determine the actual pipeline input for eval_block_with_early_return
272        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
273
274        // -- run using eval_block_with_early_return ---
275        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
276            &self.state,
277            &mut stack,
278            block,
279            eval_pipeline_input,
280        );
281
282        // -- merge env, restore job, cleanup job (boilerplate, same as before) ---
283        if eval_res.is_ok() {
284            if let Err(e) = self.state.merge_env(&mut stack) {
285                tracing::error!(
286                    "Failed to merge environment from job '{}': {}",
287                    job_display_name,
288                    e
289                );
290            }
291        }
292
293        self.state.current_job.background_thread_job = saved_bg_job;
294        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
295    }
296
297    /// Kill the background ThreadJob whose name equals `name`.
298    pub fn kill_job_by_name(&self, name: &str) {
299        if let Ok(mut jobs) = self.state.jobs.lock() {
300            let job_id = {
301                jobs.iter().find_map(|(jid, job)| {
302                    job.tag()
303                        .and_then(|tag| if tag == name { Some(jid) } else { None })
304                })
305            };
306            if let Some(job_id) = job_id {
307                let _ = jobs.kill_and_remove(job_id);
308            }
309        }
310    }
311}
312
313/// Add core cross.stream commands that are common across all Nushell pipeline runners
314pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
315    engine.add_commands(vec![
316        Box::new(commands::cas_command::CasCommand::new(store.clone())),
317        Box::new(commands::get_command::GetCommand::new(store.clone())),
318        Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
319        Box::new(commands::scru128_command::Scru128Command::new()),
320    ])
321}