1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::shell_error::generic::GenericError;
10use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
11
12use std::sync::{Arc, Mutex};
13
14use serde_json::Value as JsonValue;
15
16use crate::error::Error;
17use crate::nu::commands;
18use crate::store::{Frame, Store};
19
20#[derive(Clone)]
21pub struct Engine {
22 pub state: EngineState,
23}
24
25impl Engine {
26 pub fn new() -> Result<Self, Error> {
27 let mut engine_state = create_default_context();
28 engine_state = add_shell_command_context(engine_state);
29 engine_state = add_cli_context(engine_state);
30 nu_std::load_standard_library(&mut engine_state)?;
31
32 let init_cwd = std::env::current_dir()?;
33 gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
34
35 Ok(Self {
36 state: engine_state,
37 })
38 }
39
40 pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
41 let mut working_set = StateWorkingSet::new(&self.state);
42 for command in commands {
43 working_set.add_decl(command);
44 }
45 self.state.merge_delta(working_set.render())?;
46 Ok(())
47 }
48
49 pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
50 let mut working_set = StateWorkingSet::new(&self.state);
51 let _ = parse(
52 &mut working_set,
53 None,
54 format!("alias {name} = {target}").as_bytes(),
55 false,
56 );
57 self.state.merge_delta(working_set.render())?;
58 Ok(())
59 }
60
61 pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
62 let mut working_set = StateWorkingSet::new(&self.state);
63 let block = parse(&mut working_set, None, expression.as_bytes(), false);
64
65 if !working_set.parse_errors.is_empty() {
66 let first_error = &working_set.parse_errors[0];
67 let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
68 return Err(formatted);
69 }
70
71 let mut engine_state = self.state.clone();
72 engine_state
73 .merge_delta(working_set.render())
74 .map_err(|e| {
75 let working_set = StateWorkingSet::new(&self.state);
76 nu_protocol::format_cli_error(None, &working_set, &e, None)
77 })?;
78
79 let mut stack = Stack::new();
80 let mut stack =
81 stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
82
83 eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
84 .map(|exec_data| exec_data.body)
85 .map_err(|e| {
86 let working_set = StateWorkingSet::new(&engine_state);
87 nu_protocol::format_cli_error(None, &working_set, &e, None)
88 })
89 }
90
91 pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
92 let mut working_set = StateWorkingSet::new(&self.state);
93 let block = parse(&mut working_set, None, script.as_bytes(), false);
94 self.state
95 .merge_delta(working_set.render())
96 .map_err(Box::new)?;
97
98 let mut stack = Stack::new();
99 let result = eval_block_with_early_return::<WithoutDebug>(
100 &self.state,
101 &mut stack,
102 &block,
103 PipelineData::empty(),
104 )
105 .map_err(Box::new)?;
106 let closure = result
107 .body
108 .into_value(Span::unknown())
109 .map_err(Box::new)?
110 .into_closure()
111 .map_err(Box::new)?;
112
113 self.state.merge_env(&mut stack).map_err(Box::new)?;
114
115 Ok(closure)
116 }
117
118 pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
119 let mut working_set = StateWorkingSet::new(&self.state);
120
121 let temp_dir = tempfile::TempDir::new().map_err(|e| {
123 Box::new(ShellError::Generic(GenericError::new_internal(
124 "I/O Error",
125 format!("Failed to create temporary directory for module '{name}': {e}"),
126 )))
127 })?;
128 let module_path = temp_dir.path().join(format!("{name}.nu"));
129 std::fs::write(&module_path, content).map_err(|e| {
130 Box::new(ShellError::Generic(GenericError::new_internal(
131 "I/O Error",
132 e.to_string(),
133 )))
134 })?;
135
136 let use_stmt = format!("use {}", module_path.display());
138 let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140 if !working_set.parse_errors.is_empty() {
142 let first_error = &working_set.parse_errors[0];
143 return Err(Box::new(ShellError::Generic(GenericError::new(
144 "Parse error",
145 first_error.to_string(),
146 first_error.span(),
147 ))));
148 }
149
150 self.state
152 .merge_delta(working_set.render())
153 .map_err(Box::new)?;
154
155 let mut stack = Stack::new();
157 let _ = eval_block_with_early_return::<WithoutDebug>(
158 &self.state,
159 &mut stack,
160 &_block,
161 PipelineData::empty(),
162 )
163 .map_err(Box::new)?;
164
165 self.state.merge_env(&mut stack).map_err(Box::new)?;
167
168 Ok(())
169 }
170
171 pub fn with_env_vars(
172 mut self,
173 vars: impl IntoIterator<Item = (String, String)>,
174 ) -> Result<Self, Error> {
175 for (key, value) in vars {
176 self.state
177 .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
178 }
179
180 Ok(self)
181 }
182
183 pub fn run_closure_in_job(
184 &mut self,
185 closure: &nu_protocol::engine::Closure,
186 args: Vec<Value>,
187 pipeline_input: Option<PipelineData>,
188 job_name: impl Into<String>,
189 ) -> Result<PipelineData, Box<ShellError>> {
190 let job_display_name = job_name.into(); let (sender, _rx) = std::sync::mpsc::channel();
194 let job = ThreadJob::new(
195 self.state.signals().clone(),
196 Some(job_display_name.clone()),
197 sender,
198 );
199 let _job_id = {
200 let mut j = self.state.jobs.lock().unwrap();
201 j.add_job(Job::Thread(job.clone()))
202 };
203
204 let saved_bg_job = self.state.current_job.background_thread_job.clone();
206 self.state.current_job.background_thread_job = Some(job.clone());
207
208 let block = self.state.get_block(closure.block_id);
210 let mut stack = Stack::new();
211 let mut stack =
212 stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
213
214 let num_required = block.signature.required_positional.len();
215 let num_optional = block.signature.optional_positional.len();
216 let total_positional = num_required + num_optional;
217
218 if args.len() > total_positional {
219 return Err(Box::new(ShellError::Generic(GenericError::new(
220 format!(
221 "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
222 args.len()
223 ),
224 format!("Closure signature: {name}", name = block.signature.name),
225 block.span.unwrap_or_else(Span::unknown),
226 ))));
227 }
228
229 if args.len() < num_required {
230 return Err(Box::new(ShellError::Generic(GenericError::new(
231 format!(
232 "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
233 args.len()
234 ),
235 format!("Closure signature: {name}", name = block.signature.name),
236 block.span.unwrap_or_else(Span::unknown),
237 ))));
238 }
239
240 for (i, val) in args.iter().enumerate() {
242 let param = if i < num_required {
243 &block.signature.required_positional[i]
244 } else {
245 &block.signature.optional_positional[i - num_required]
246 };
247 if let Some(var_id) = param.var_id {
248 stack.add_var(var_id, val.clone());
249 }
250 }
251
252 let optional_covered = args.len().saturating_sub(num_required);
254 for i in optional_covered..num_optional {
255 let param = &block.signature.optional_positional[i];
256 if let Some(var_id) = param.var_id {
257 let default = param
258 .default_value
259 .clone()
260 .unwrap_or_else(|| Value::nothing(Span::unknown()));
261 stack.add_var(var_id, default);
262 }
263 }
264
265 let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
267
268 let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
270 &self.state,
271 &mut stack,
272 block,
273 eval_pipeline_input,
274 );
275
276 if eval_res.is_ok() {
278 if let Err(e) = self.state.merge_env(&mut stack) {
279 tracing::error!(
280 "Failed to merge environment from job '{}': {}",
281 job_display_name,
282 e
283 );
284 }
285 }
286
287 self.state.current_job.background_thread_job = saved_bg_job;
288 eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
289 }
290
291 pub fn kill_job_by_name(&self, name: &str) {
293 if let Ok(mut jobs) = self.state.jobs.lock() {
294 let job_id = {
295 jobs.iter().find_map(|(jid, job)| {
296 job.description()
297 .and_then(|desc| if desc == name { Some(jid) } else { None })
298 })
299 };
300 if let Some(job_id) = job_id {
301 let _ = jobs.kill_and_remove(job_id);
302 }
303 }
304 }
305}
306
307pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
309 engine.add_commands(vec![
310 Box::new(commands::cas_command::CasCommand::new(store.clone())),
311 Box::new(commands::get_command::GetCommand::new(store.clone())),
312 Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
313 Box::new(commands::scru128_command::Scru128Command::new()),
314 ])
315}
316
317pub enum ReadMode {
319 Stream,
321 Plain,
323}
324
325pub enum AppendMode {
328 Direct(JsonValue),
330 Buffered(Arc<Mutex<Vec<Frame>>>),
332}
333
334pub fn add_read_commands(engine: &mut Engine, store: &Store, mode: ReadMode) -> Result<(), Error> {
336 match mode {
337 ReadMode::Stream => engine.add_commands(vec![
338 Box::new(commands::cat_stream_command::CatStreamCommand::new(
339 store.clone(),
340 )),
341 Box::new(commands::last_stream_command::LastStreamCommand::new(
342 store.clone(),
343 )),
344 ]),
345 ReadMode::Plain => engine.add_commands(vec![
346 Box::new(commands::cat_command::CatCommand::new(store.clone())),
347 Box::new(commands::last_command::LastCommand::new(store.clone())),
348 ]),
349 }
350}
351
352pub fn add_write_commands(
355 engine: &mut Engine,
356 store: &Store,
357 mode: AppendMode,
358) -> Result<(), Error> {
359 engine.add_commands(vec![
360 Box::new(commands::import_command::ImportCommand::new(store.clone())),
361 Box::new(commands::cas_post_command::CasPostCommand::new(
362 store.clone(),
363 )),
364 ])?;
365 match mode {
366 AppendMode::Direct(base_meta) => engine.add_commands(vec![Box::new(
367 commands::append_command::AppendCommand::new(store.clone(), base_meta),
368 )]),
369 AppendMode::Buffered(output) => engine.add_commands(vec![Box::new(
370 commands::append_command_buffered::AppendCommand::new(store.clone(), output),
371 )]),
372 }
373}