Skip to main content

xs/nu/
engine.rs

1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::shell_error::generic::GenericError;
10use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
11
12use std::sync::{Arc, Mutex};
13
14use serde_json::Value as JsonValue;
15
16use crate::error::Error;
17use crate::nu::commands;
18use crate::store::{Frame, Store};
19
20#[derive(Clone)]
21pub struct Engine {
22    pub state: EngineState,
23}
24
25impl Engine {
26    pub fn new() -> Result<Self, Error> {
27        let mut engine_state = create_default_context();
28        engine_state = add_shell_command_context(engine_state);
29        engine_state = add_cli_context(engine_state);
30        nu_std::load_standard_library(&mut engine_state)?;
31
32        let init_cwd = std::env::current_dir()?;
33        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
34
35        Ok(Self {
36            state: engine_state,
37        })
38    }
39
40    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
41        let mut working_set = StateWorkingSet::new(&self.state);
42        for command in commands {
43            working_set.add_decl(command);
44        }
45        self.state.merge_delta(working_set.render())?;
46        Ok(())
47    }
48
49    pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
50        let mut working_set = StateWorkingSet::new(&self.state);
51        let _ = parse(
52            &mut working_set,
53            None,
54            format!("alias {name} = {target}").as_bytes(),
55            false,
56        );
57        self.state.merge_delta(working_set.render())?;
58        Ok(())
59    }
60
61    pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
62        let mut working_set = StateWorkingSet::new(&self.state);
63        let block = parse(&mut working_set, None, expression.as_bytes(), false);
64
65        if !working_set.parse_errors.is_empty() {
66            let first_error = &working_set.parse_errors[0];
67            let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
68            return Err(formatted);
69        }
70
71        let mut engine_state = self.state.clone();
72        engine_state
73            .merge_delta(working_set.render())
74            .map_err(|e| {
75                let working_set = StateWorkingSet::new(&self.state);
76                nu_protocol::format_cli_error(None, &working_set, &e, None)
77            })?;
78
79        let mut stack = Stack::new();
80        let mut stack =
81            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
82
83        eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
84            .map(|exec_data| exec_data.body)
85            .map_err(|e| {
86                let working_set = StateWorkingSet::new(&engine_state);
87                nu_protocol::format_cli_error(None, &working_set, &e, None)
88            })
89    }
90
91    pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
92        let mut working_set = StateWorkingSet::new(&self.state);
93        let block = parse(&mut working_set, None, script.as_bytes(), false);
94        self.state
95            .merge_delta(working_set.render())
96            .map_err(Box::new)?;
97
98        let mut stack = Stack::new();
99        let result = eval_block_with_early_return::<WithoutDebug>(
100            &self.state,
101            &mut stack,
102            &block,
103            PipelineData::empty(),
104        )
105        .map_err(Box::new)?;
106        let closure = result
107            .body
108            .into_value(Span::unknown())
109            .map_err(Box::new)?
110            .into_closure()
111            .map_err(Box::new)?;
112
113        self.state.merge_env(&mut stack).map_err(Box::new)?;
114
115        Ok(closure)
116    }
117
118    pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
119        let mut working_set = StateWorkingSet::new(&self.state);
120
121        // Create temporary file with .nu extension that will be cleaned up when temp_dir is dropped
122        let temp_dir = tempfile::TempDir::new().map_err(|e| {
123            Box::new(ShellError::Generic(GenericError::new_internal(
124                "I/O Error",
125                format!("Failed to create temporary directory for module '{name}': {e}"),
126            )))
127        })?;
128        let module_path = temp_dir.path().join(format!("{name}.nu"));
129        std::fs::write(&module_path, content).map_err(|e| {
130            Box::new(ShellError::Generic(GenericError::new_internal(
131                "I/O Error",
132                e.to_string(),
133            )))
134        })?;
135
136        // Parse the use statement
137        let use_stmt = format!("use {}", module_path.display());
138        let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140        // Check for parse errors
141        if !working_set.parse_errors.is_empty() {
142            let first_error = &working_set.parse_errors[0];
143            return Err(Box::new(ShellError::Generic(GenericError::new(
144                "Parse error",
145                first_error.to_string(),
146                first_error.span(),
147            ))));
148        }
149
150        // Merge changes into engine state
151        self.state
152            .merge_delta(working_set.render())
153            .map_err(Box::new)?;
154
155        // Create a temporary stack and evaluate the module
156        let mut stack = Stack::new();
157        let _ = eval_block_with_early_return::<WithoutDebug>(
158            &self.state,
159            &mut stack,
160            &_block,
161            PipelineData::empty(),
162        )
163        .map_err(Box::new)?;
164
165        // Merge environment variables into engine state
166        self.state.merge_env(&mut stack).map_err(Box::new)?;
167
168        Ok(())
169    }
170
171    pub fn with_env_vars(
172        mut self,
173        vars: impl IntoIterator<Item = (String, String)>,
174    ) -> Result<Self, Error> {
175        for (key, value) in vars {
176            self.state
177                .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
178        }
179
180        Ok(self)
181    }
182
183    /// Set the base metadata stamped on frames this engine's `.append` writes
184    /// (`service_id`, `{action_id, frame_id}`, ...). Injected as `$env` so the
185    /// `.append` decl stays instance-independent. See `append_command`.
186    pub fn set_append_meta(&mut self, meta: &JsonValue) {
187        self.state.add_env_var(
188            crate::nu::commands::append_command::APPEND_META_ENV.to_string(),
189            Value::string(meta.to_string(), Span::unknown()),
190        );
191    }
192
193    pub fn run_closure_in_job(
194        &mut self,
195        closure: &nu_protocol::engine::Closure,
196        args: Vec<Value>,
197        pipeline_input: Option<PipelineData>,
198        job_name: impl Into<String>,
199    ) -> Result<PipelineData, Box<ShellError>> {
200        let job_display_name = job_name.into(); // Convert job_name early for error messages
201
202        // -- create & register job (boilerplate) ---
203        let (sender, _rx) = std::sync::mpsc::channel();
204        let job = ThreadJob::new(
205            self.state.signals().clone(),
206            Some(job_display_name.clone()),
207            sender,
208        );
209        let _job_id = {
210            let mut j = self.state.jobs.lock().unwrap();
211            j.add_job(Job::Thread(job.clone()))
212        };
213
214        // -- temporarily attach the job to self.state (boilerplate) ---
215        let saved_bg_job = self.state.current_job.background_thread_job.clone();
216        self.state.current_job.background_thread_job = Some(job.clone());
217
218        // -- prepare stack & validate/inject positional arguments ---
219        let block = self.state.get_block(closure.block_id);
220        let mut stack = Stack::new();
221        let mut stack =
222            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
223
224        let num_required = block.signature.required_positional.len();
225        let num_optional = block.signature.optional_positional.len();
226        let total_positional = num_required + num_optional;
227
228        if args.len() > total_positional {
229            return Err(Box::new(ShellError::Generic(GenericError::new(
230                format!(
231                    "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
232                    args.len()
233                ),
234                format!("Closure signature: {name}", name = block.signature.name),
235                block.span.unwrap_or_else(Span::unknown),
236            ))));
237        }
238
239        if args.len() < num_required {
240            return Err(Box::new(ShellError::Generic(GenericError::new(
241                format!(
242                    "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
243                    args.len()
244                ),
245                format!("Closure signature: {name}", name = block.signature.name),
246                block.span.unwrap_or_else(Span::unknown),
247            ))));
248        }
249
250        // Inject provided positional args
251        for (i, val) in args.iter().enumerate() {
252            let param = if i < num_required {
253                &block.signature.required_positional[i]
254            } else {
255                &block.signature.optional_positional[i - num_required]
256            };
257            if let Some(var_id) = param.var_id {
258                stack.add_var(var_id, val.clone());
259            }
260        }
261
262        // Set default values for optional params not covered by provided args
263        let optional_covered = args.len().saturating_sub(num_required);
264        for i in optional_covered..num_optional {
265            let param = &block.signature.optional_positional[i];
266            if let Some(var_id) = param.var_id {
267                let default = param
268                    .default_value
269                    .clone()
270                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
271                stack.add_var(var_id, default);
272            }
273        }
274
275        // Determine the actual pipeline input for eval_block_with_early_return
276        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
277
278        // -- run using eval_block_with_early_return ---
279        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
280            &self.state,
281            &mut stack,
282            block,
283            eval_pipeline_input,
284        );
285
286        // -- merge env, restore job, cleanup job (boilerplate, same as before) ---
287        if eval_res.is_ok() {
288            if let Err(e) = self.state.merge_env(&mut stack) {
289                tracing::error!(
290                    "Failed to merge environment from job '{}': {}",
291                    job_display_name,
292                    e
293                );
294            }
295        }
296
297        self.state.current_job.background_thread_job = saved_bg_job;
298        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
299    }
300
301    /// Kill the background ThreadJob whose name equals `name`.
302    pub fn kill_job_by_name(&self, name: &str) {
303        if let Ok(mut jobs) = self.state.jobs.lock() {
304            let job_id = {
305                jobs.iter().find_map(|(jid, job)| {
306                    job.description()
307                        .and_then(|desc| if desc == name { Some(jid) } else { None })
308                })
309            };
310            if let Some(job_id) = job_id {
311                let _ = jobs.kill_and_remove(job_id);
312            }
313        }
314    }
315}
316
317/// Add core cross.stream commands that are common across all Nushell pipeline runners
318pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
319    engine.add_commands(vec![
320        Box::new(commands::cas_command::CasCommand::new(store.clone())),
321        Box::new(commands::get_command::GetCommand::new(store.clone())),
322        Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
323        Box::new(commands::scru128_command::Scru128Command::new()),
324    ])
325}
326
327/// Which `.cat`/`.last` flavor a pipeline runner exposes.
328pub enum ReadMode {
329    /// Streaming readers that support `--follow` (eval, actions, services).
330    Stream,
331    /// Collected readers without follow (actors).
332    Plain,
333}
334
335/// How `.append` behaves. This is the one store command whose behaviour
336/// genuinely varies by runner.
337pub enum AppendMode {
338    /// Write straight to the store. The per-instance base metadata is injected
339    /// via the `XS_APPEND_META` env var (see `append_command`), not baked in.
340    Direct,
341    /// Buffer appended frames into `output` for the caller to flush (actors).
342    Buffered(Arc<Mutex<Vec<Frame>>>),
343}
344
345/// Register the `.cat` and `.last` read builtins for a pipeline runner.
346pub fn add_read_commands(engine: &mut Engine, store: &Store, mode: ReadMode) -> Result<(), Error> {
347    match mode {
348        ReadMode::Stream => engine.add_commands(vec![
349            Box::new(commands::cat_stream_command::CatStreamCommand::new(
350                store.clone(),
351            )),
352            Box::new(commands::last_stream_command::LastStreamCommand::new(
353                store.clone(),
354            )),
355        ]),
356        ReadMode::Plain => engine.add_commands(vec![
357            Box::new(commands::cat_command::CatCommand::new(store.clone())),
358            Box::new(commands::last_command::LastCommand::new(store.clone())),
359        ]),
360    }
361}
362
363/// Register the write builtins for a pipeline runner: `.append` (per `mode`)
364/// plus `.import` and `.cas-post`, which are identical across runners.
365pub fn add_write_commands(
366    engine: &mut Engine,
367    store: &Store,
368    mode: AppendMode,
369) -> Result<(), Error> {
370    engine.add_commands(vec![
371        Box::new(commands::import_command::ImportCommand::new(store.clone())),
372        Box::new(commands::cas_post_command::CasPostCommand::new(
373            store.clone(),
374        )),
375    ])?;
376    match mode {
377        AppendMode::Direct => engine.add_commands(vec![Box::new(
378            commands::append_command::AppendCommand::new(store.clone()),
379        )]),
380        AppendMode::Buffered(output) => engine.add_commands(vec![Box::new(
381            commands::append_command_buffered::AppendCommand::new(store.clone(), output),
382        )]),
383    }
384}
385
386/// The module-free, instance-free base engine for a runner: nushell + stdlib +
387/// the core builtins + the `.rm` alias + the read builtins, plus the `.append`
388/// write builtin for direct writers. Build this once per runner and `clone()` it per
389/// spawn or restart; `load_modules(as_of)` and `set_append_meta(..)` specialize
390/// each clone. Actors pass `direct_write: false` and add their per-instance
391/// buffered `.append` to the clone.
392pub fn prepared_base(store: &Store, read: ReadMode, direct_write: bool) -> Result<Engine, Error> {
393    let mut engine = Engine::new()?;
394    add_core_commands(&mut engine, store)?;
395    engine.add_alias(".rm", ".remove")?;
396    add_read_commands(&mut engine, store, read)?;
397    if direct_write {
398        add_write_commands(&mut engine, store, AppendMode::Direct)?;
399    }
400    Ok(engine)
401}