Skip to main content

xs/nu/
engine.rs

1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::shell_error::generic::GenericError;
10use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
11
12use std::sync::{Arc, Mutex};
13
14use serde_json::Value as JsonValue;
15
16use crate::error::Error;
17use crate::nu::commands;
18use crate::store::{Frame, Store};
19
20#[derive(Clone)]
21pub struct Engine {
22    pub state: EngineState,
23}
24
25impl Engine {
26    pub fn new() -> Result<Self, Error> {
27        let mut engine_state = create_default_context();
28        engine_state = add_shell_command_context(engine_state);
29        engine_state = add_cli_context(engine_state);
30        nu_std::load_standard_library(&mut engine_state)?;
31
32        let init_cwd = std::env::current_dir()?;
33        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
34
35        Ok(Self {
36            state: engine_state,
37        })
38    }
39
40    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
41        let mut working_set = StateWorkingSet::new(&self.state);
42        for command in commands {
43            working_set.add_decl(command);
44        }
45        self.state.merge_delta(working_set.render())?;
46        Ok(())
47    }
48
49    pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
50        let mut working_set = StateWorkingSet::new(&self.state);
51        let _ = parse(
52            &mut working_set,
53            None,
54            format!("alias {name} = {target}").as_bytes(),
55            false,
56        );
57        self.state.merge_delta(working_set.render())?;
58        Ok(())
59    }
60
61    pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
62        let mut working_set = StateWorkingSet::new(&self.state);
63        let block = parse(&mut working_set, None, expression.as_bytes(), false);
64
65        if !working_set.parse_errors.is_empty() {
66            let first_error = &working_set.parse_errors[0];
67            let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
68            return Err(formatted);
69        }
70
71        let mut engine_state = self.state.clone();
72        engine_state
73            .merge_delta(working_set.render())
74            .map_err(|e| {
75                let working_set = StateWorkingSet::new(&self.state);
76                nu_protocol::format_cli_error(None, &working_set, &e, None)
77            })?;
78
79        let mut stack = Stack::new();
80        let mut stack =
81            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
82
83        eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
84            .map(|exec_data| exec_data.body)
85            .map_err(|e| {
86                let working_set = StateWorkingSet::new(&engine_state);
87                nu_protocol::format_cli_error(None, &working_set, &e, None)
88            })
89    }
90
91    pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
92        let mut working_set = StateWorkingSet::new(&self.state);
93        let block = parse(&mut working_set, None, script.as_bytes(), false);
94        self.state
95            .merge_delta(working_set.render())
96            .map_err(Box::new)?;
97
98        let mut stack = Stack::new();
99        let result = eval_block_with_early_return::<WithoutDebug>(
100            &self.state,
101            &mut stack,
102            &block,
103            PipelineData::empty(),
104        )
105        .map_err(Box::new)?;
106        let closure = result
107            .body
108            .into_value(Span::unknown())
109            .map_err(Box::new)?
110            .into_closure()
111            .map_err(Box::new)?;
112
113        self.state.merge_env(&mut stack).map_err(Box::new)?;
114
115        Ok(closure)
116    }
117
118    pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
119        let mut working_set = StateWorkingSet::new(&self.state);
120
121        // Create temporary file with .nu extension that will be cleaned up when temp_dir is dropped
122        let temp_dir = tempfile::TempDir::new().map_err(|e| {
123            Box::new(ShellError::Generic(GenericError::new_internal(
124                "I/O Error",
125                format!("Failed to create temporary directory for module '{name}': {e}"),
126            )))
127        })?;
128        let module_path = temp_dir.path().join(format!("{name}.nu"));
129        std::fs::write(&module_path, content).map_err(|e| {
130            Box::new(ShellError::Generic(GenericError::new_internal(
131                "I/O Error",
132                e.to_string(),
133            )))
134        })?;
135
136        // Parse the use statement
137        let use_stmt = format!("use {}", module_path.display());
138        let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140        // Check for parse errors
141        if !working_set.parse_errors.is_empty() {
142            let first_error = &working_set.parse_errors[0];
143            return Err(Box::new(ShellError::Generic(GenericError::new(
144                "Parse error",
145                first_error.to_string(),
146                first_error.span(),
147            ))));
148        }
149
150        // Merge changes into engine state
151        self.state
152            .merge_delta(working_set.render())
153            .map_err(Box::new)?;
154
155        // Create a temporary stack and evaluate the module
156        let mut stack = Stack::new();
157        let _ = eval_block_with_early_return::<WithoutDebug>(
158            &self.state,
159            &mut stack,
160            &_block,
161            PipelineData::empty(),
162        )
163        .map_err(Box::new)?;
164
165        // Merge environment variables into engine state
166        self.state.merge_env(&mut stack).map_err(Box::new)?;
167
168        Ok(())
169    }
170
171    pub fn with_env_vars(
172        mut self,
173        vars: impl IntoIterator<Item = (String, String)>,
174    ) -> Result<Self, Error> {
175        for (key, value) in vars {
176            self.state
177                .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
178        }
179
180        Ok(self)
181    }
182
183    pub fn run_closure_in_job(
184        &mut self,
185        closure: &nu_protocol::engine::Closure,
186        args: Vec<Value>,
187        pipeline_input: Option<PipelineData>,
188        job_name: impl Into<String>,
189    ) -> Result<PipelineData, Box<ShellError>> {
190        let job_display_name = job_name.into(); // Convert job_name early for error messages
191
192        // -- create & register job (boilerplate) ---
193        let (sender, _rx) = std::sync::mpsc::channel();
194        let job = ThreadJob::new(
195            self.state.signals().clone(),
196            Some(job_display_name.clone()),
197            sender,
198        );
199        let _job_id = {
200            let mut j = self.state.jobs.lock().unwrap();
201            j.add_job(Job::Thread(job.clone()))
202        };
203
204        // -- temporarily attach the job to self.state (boilerplate) ---
205        let saved_bg_job = self.state.current_job.background_thread_job.clone();
206        self.state.current_job.background_thread_job = Some(job.clone());
207
208        // -- prepare stack & validate/inject positional arguments ---
209        let block = self.state.get_block(closure.block_id);
210        let mut stack = Stack::new();
211        let mut stack =
212            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
213
214        let num_required = block.signature.required_positional.len();
215        let num_optional = block.signature.optional_positional.len();
216        let total_positional = num_required + num_optional;
217
218        if args.len() > total_positional {
219            return Err(Box::new(ShellError::Generic(GenericError::new(
220                format!(
221                    "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
222                    args.len()
223                ),
224                format!("Closure signature: {name}", name = block.signature.name),
225                block.span.unwrap_or_else(Span::unknown),
226            ))));
227        }
228
229        if args.len() < num_required {
230            return Err(Box::new(ShellError::Generic(GenericError::new(
231                format!(
232                    "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
233                    args.len()
234                ),
235                format!("Closure signature: {name}", name = block.signature.name),
236                block.span.unwrap_or_else(Span::unknown),
237            ))));
238        }
239
240        // Inject provided positional args
241        for (i, val) in args.iter().enumerate() {
242            let param = if i < num_required {
243                &block.signature.required_positional[i]
244            } else {
245                &block.signature.optional_positional[i - num_required]
246            };
247            if let Some(var_id) = param.var_id {
248                stack.add_var(var_id, val.clone());
249            }
250        }
251
252        // Set default values for optional params not covered by provided args
253        let optional_covered = args.len().saturating_sub(num_required);
254        for i in optional_covered..num_optional {
255            let param = &block.signature.optional_positional[i];
256            if let Some(var_id) = param.var_id {
257                let default = param
258                    .default_value
259                    .clone()
260                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
261                stack.add_var(var_id, default);
262            }
263        }
264
265        // Determine the actual pipeline input for eval_block_with_early_return
266        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
267
268        // -- run using eval_block_with_early_return ---
269        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
270            &self.state,
271            &mut stack,
272            block,
273            eval_pipeline_input,
274        );
275
276        // -- merge env, restore job, cleanup job (boilerplate, same as before) ---
277        if eval_res.is_ok() {
278            if let Err(e) = self.state.merge_env(&mut stack) {
279                tracing::error!(
280                    "Failed to merge environment from job '{}': {}",
281                    job_display_name,
282                    e
283                );
284            }
285        }
286
287        self.state.current_job.background_thread_job = saved_bg_job;
288        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
289    }
290
291    /// Kill the background ThreadJob whose name equals `name`.
292    pub fn kill_job_by_name(&self, name: &str) {
293        if let Ok(mut jobs) = self.state.jobs.lock() {
294            let job_id = {
295                jobs.iter().find_map(|(jid, job)| {
296                    job.description()
297                        .and_then(|desc| if desc == name { Some(jid) } else { None })
298                })
299            };
300            if let Some(job_id) = job_id {
301                let _ = jobs.kill_and_remove(job_id);
302            }
303        }
304    }
305}
306
307/// Add core cross.stream commands that are common across all Nushell pipeline runners
308pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
309    engine.add_commands(vec![
310        Box::new(commands::cas_command::CasCommand::new(store.clone())),
311        Box::new(commands::get_command::GetCommand::new(store.clone())),
312        Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
313        Box::new(commands::scru128_command::Scru128Command::new()),
314    ])
315}
316
317/// Which `.cat`/`.last` flavor a pipeline runner exposes.
318pub enum ReadMode {
319    /// Streaming readers that support `--follow` (eval, actions, services).
320    Stream,
321    /// Collected readers without follow (actors).
322    Plain,
323}
324
325/// How `.append` behaves. This is the one axis of the store surface that
326/// genuinely varies by runner.
327pub enum AppendMode {
328    /// Write straight to the store, tagging each frame's metadata with `base_meta`.
329    Direct(JsonValue),
330    /// Buffer appended frames into `output` for the caller to flush (actors).
331    Buffered(Arc<Mutex<Vec<Frame>>>),
332}
333
334/// Register the `.cat` and `.last` read commands for a pipeline runner.
335pub fn add_read_commands(engine: &mut Engine, store: &Store, mode: ReadMode) -> Result<(), Error> {
336    match mode {
337        ReadMode::Stream => engine.add_commands(vec![
338            Box::new(commands::cat_stream_command::CatStreamCommand::new(
339                store.clone(),
340            )),
341            Box::new(commands::last_stream_command::LastStreamCommand::new(
342                store.clone(),
343            )),
344        ]),
345        ReadMode::Plain => engine.add_commands(vec![
346            Box::new(commands::cat_command::CatCommand::new(store.clone())),
347            Box::new(commands::last_command::LastCommand::new(store.clone())),
348        ]),
349    }
350}
351
352/// Register the write surface for a pipeline runner: `.append` (per `mode`) plus
353/// `.import` and `.cas-post`, which are identical across runners.
354pub fn add_write_commands(
355    engine: &mut Engine,
356    store: &Store,
357    mode: AppendMode,
358) -> Result<(), Error> {
359    engine.add_commands(vec![
360        Box::new(commands::import_command::ImportCommand::new(store.clone())),
361        Box::new(commands::cas_post_command::CasPostCommand::new(
362            store.clone(),
363        )),
364    ])?;
365    match mode {
366        AppendMode::Direct(base_meta) => engine.add_commands(vec![Box::new(
367            commands::append_command::AppendCommand::new(store.clone(), base_meta),
368        )]),
369        AppendMode::Buffered(output) => engine.add_commands(vec![Box::new(
370            commands::append_command_buffered::AppendCommand::new(store.clone(), output),
371        )]),
372    }
373}