1use nu_cli::{add_cli_context, gather_parent_env_vars};
2use nu_cmd_lang::create_default_context;
3use nu_command::add_shell_command_context;
4use nu_engine::eval_block_with_early_return;
5use nu_parser::parse;
6use nu_protocol::debugger::WithoutDebug;
7use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
8use nu_protocol::engine::{Job, ThreadJob};
9use nu_protocol::shell_error::generic::GenericError;
10use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};
11
12use std::sync::{Arc, Mutex};
13
14use serde_json::Value as JsonValue;
15
16use crate::error::Error;
17use crate::nu::commands;
18use crate::store::{Frame, Store};
19
20#[derive(Clone)]
21pub struct Engine {
22 pub state: EngineState,
23}
24
25impl Engine {
26 pub fn new() -> Result<Self, Error> {
27 let mut engine_state = create_default_context();
28 engine_state = add_shell_command_context(engine_state);
29 engine_state = add_cli_context(engine_state);
30 nu_std::load_standard_library(&mut engine_state)?;
31
32 let init_cwd = std::env::current_dir()?;
33 gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
34
35 Ok(Self {
36 state: engine_state,
37 })
38 }
39
40 pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
41 let mut working_set = StateWorkingSet::new(&self.state);
42 for command in commands {
43 working_set.add_decl(command);
44 }
45 self.state.merge_delta(working_set.render())?;
46 Ok(())
47 }
48
49 pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
50 let mut working_set = StateWorkingSet::new(&self.state);
51 let _ = parse(
52 &mut working_set,
53 None,
54 format!("alias {name} = {target}").as_bytes(),
55 false,
56 );
57 self.state.merge_delta(working_set.render())?;
58 Ok(())
59 }
60
61 pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
62 let mut working_set = StateWorkingSet::new(&self.state);
63 let block = parse(&mut working_set, None, expression.as_bytes(), false);
64
65 if !working_set.parse_errors.is_empty() {
66 let first_error = &working_set.parse_errors[0];
67 let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
68 return Err(formatted);
69 }
70
71 let mut engine_state = self.state.clone();
72 engine_state
73 .merge_delta(working_set.render())
74 .map_err(|e| {
75 let working_set = StateWorkingSet::new(&self.state);
76 nu_protocol::format_cli_error(None, &working_set, &e, None)
77 })?;
78
79 let mut stack = Stack::new();
80 let mut stack =
81 stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
82
83 eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
84 .map(|exec_data| exec_data.body)
85 .map_err(|e| {
86 let working_set = StateWorkingSet::new(&engine_state);
87 nu_protocol::format_cli_error(None, &working_set, &e, None)
88 })
89 }
90
91 pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
92 let mut working_set = StateWorkingSet::new(&self.state);
93 let block = parse(&mut working_set, None, script.as_bytes(), false);
94 self.state
95 .merge_delta(working_set.render())
96 .map_err(Box::new)?;
97
98 let mut stack = Stack::new();
99 let result = eval_block_with_early_return::<WithoutDebug>(
100 &self.state,
101 &mut stack,
102 &block,
103 PipelineData::empty(),
104 )
105 .map_err(Box::new)?;
106 let closure = result
107 .body
108 .into_value(Span::unknown())
109 .map_err(Box::new)?
110 .into_closure()
111 .map_err(Box::new)?;
112
113 self.state.merge_env(&mut stack).map_err(Box::new)?;
114
115 Ok(closure)
116 }
117
118 pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
119 let mut working_set = StateWorkingSet::new(&self.state);
120
121 let temp_dir = tempfile::TempDir::new().map_err(|e| {
123 Box::new(ShellError::Generic(GenericError::new_internal(
124 "I/O Error",
125 format!("Failed to create temporary directory for module '{name}': {e}"),
126 )))
127 })?;
128 let module_path = temp_dir.path().join(format!("{name}.nu"));
129 std::fs::write(&module_path, content).map_err(|e| {
130 Box::new(ShellError::Generic(GenericError::new_internal(
131 "I/O Error",
132 e.to_string(),
133 )))
134 })?;
135
136 let use_stmt = format!("use {}", module_path.display());
138 let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);
139
140 if !working_set.parse_errors.is_empty() {
142 let first_error = &working_set.parse_errors[0];
143 return Err(Box::new(ShellError::Generic(GenericError::new(
144 "Parse error",
145 first_error.to_string(),
146 first_error.span(),
147 ))));
148 }
149
150 self.state
152 .merge_delta(working_set.render())
153 .map_err(Box::new)?;
154
155 let mut stack = Stack::new();
157 let _ = eval_block_with_early_return::<WithoutDebug>(
158 &self.state,
159 &mut stack,
160 &_block,
161 PipelineData::empty(),
162 )
163 .map_err(Box::new)?;
164
165 self.state.merge_env(&mut stack).map_err(Box::new)?;
167
168 Ok(())
169 }
170
171 pub fn with_env_vars(
172 mut self,
173 vars: impl IntoIterator<Item = (String, String)>,
174 ) -> Result<Self, Error> {
175 for (key, value) in vars {
176 self.state
177 .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
178 }
179
180 Ok(self)
181 }
182
183 pub fn set_append_meta(&mut self, meta: &JsonValue) {
187 self.state.add_env_var(
188 crate::nu::commands::append_command::APPEND_META_ENV.to_string(),
189 Value::string(meta.to_string(), Span::unknown()),
190 );
191 }
192
193 pub fn run_closure_in_job(
194 &mut self,
195 closure: &nu_protocol::engine::Closure,
196 args: Vec<Value>,
197 pipeline_input: Option<PipelineData>,
198 job_name: impl Into<String>,
199 ) -> Result<PipelineData, Box<ShellError>> {
200 let job_display_name = job_name.into(); let (sender, _rx) = std::sync::mpsc::channel();
204 let job = ThreadJob::new(
205 self.state.signals().clone(),
206 Some(job_display_name.clone()),
207 sender,
208 );
209 let _job_id = {
210 let mut j = self.state.jobs.lock().unwrap();
211 j.add_job(Job::Thread(job.clone()))
212 };
213
214 let saved_bg_job = self.state.current_job.background_thread_job.clone();
216 self.state.current_job.background_thread_job = Some(job.clone());
217
218 let block = self.state.get_block(closure.block_id);
220 let mut stack = Stack::new();
221 let mut stack =
222 stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
223
224 let num_required = block.signature.required_positional.len();
225 let num_optional = block.signature.optional_positional.len();
226 let total_positional = num_required + num_optional;
227
228 if args.len() > total_positional {
229 return Err(Box::new(ShellError::Generic(GenericError::new(
230 format!(
231 "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
232 args.len()
233 ),
234 format!("Closure signature: {name}", name = block.signature.name),
235 block.span.unwrap_or_else(Span::unknown),
236 ))));
237 }
238
239 if args.len() < num_required {
240 return Err(Box::new(ShellError::Generic(GenericError::new(
241 format!(
242 "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
243 args.len()
244 ),
245 format!("Closure signature: {name}", name = block.signature.name),
246 block.span.unwrap_or_else(Span::unknown),
247 ))));
248 }
249
250 for (i, val) in args.iter().enumerate() {
252 let param = if i < num_required {
253 &block.signature.required_positional[i]
254 } else {
255 &block.signature.optional_positional[i - num_required]
256 };
257 if let Some(var_id) = param.var_id {
258 stack.add_var(var_id, val.clone());
259 }
260 }
261
262 let optional_covered = args.len().saturating_sub(num_required);
264 for i in optional_covered..num_optional {
265 let param = &block.signature.optional_positional[i];
266 if let Some(var_id) = param.var_id {
267 let default = param
268 .default_value
269 .clone()
270 .unwrap_or_else(|| Value::nothing(Span::unknown()));
271 stack.add_var(var_id, default);
272 }
273 }
274
275 let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
277
278 let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
280 &self.state,
281 &mut stack,
282 block,
283 eval_pipeline_input,
284 );
285
286 if eval_res.is_ok() {
288 if let Err(e) = self.state.merge_env(&mut stack) {
289 tracing::error!(
290 "Failed to merge environment from job '{}': {}",
291 job_display_name,
292 e
293 );
294 }
295 }
296
297 self.state.current_job.background_thread_job = saved_bg_job;
298 eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
299 }
300
301 pub fn kill_job_by_name(&self, name: &str) {
303 if let Ok(mut jobs) = self.state.jobs.lock() {
304 let job_id = {
305 jobs.iter().find_map(|(jid, job)| {
306 job.description()
307 .and_then(|desc| if desc == name { Some(jid) } else { None })
308 })
309 };
310 if let Some(job_id) = job_id {
311 let _ = jobs.kill_and_remove(job_id);
312 }
313 }
314 }
315}
316
317pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
319 engine.add_commands(vec![
320 Box::new(commands::cas_command::CasCommand::new(store.clone())),
321 Box::new(commands::get_command::GetCommand::new(store.clone())),
322 Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
323 Box::new(commands::scru128_command::Scru128Command::new()),
324 ])
325}
326
327pub enum ReadMode {
329 Stream,
331 Plain,
333}
334
335pub enum AppendMode {
338 Direct,
341 Buffered(Arc<Mutex<Vec<Frame>>>),
343}
344
345pub fn add_read_commands(engine: &mut Engine, store: &Store, mode: ReadMode) -> Result<(), Error> {
347 match mode {
348 ReadMode::Stream => engine.add_commands(vec![
349 Box::new(commands::cat_stream_command::CatStreamCommand::new(
350 store.clone(),
351 )),
352 Box::new(commands::last_stream_command::LastStreamCommand::new(
353 store.clone(),
354 )),
355 ]),
356 ReadMode::Plain => engine.add_commands(vec![
357 Box::new(commands::cat_command::CatCommand::new(store.clone())),
358 Box::new(commands::last_command::LastCommand::new(store.clone())),
359 ]),
360 }
361}
362
363pub fn add_write_commands(
366 engine: &mut Engine,
367 store: &Store,
368 mode: AppendMode,
369) -> Result<(), Error> {
370 engine.add_commands(vec![
371 Box::new(commands::import_command::ImportCommand::new(store.clone())),
372 Box::new(commands::cas_post_command::CasPostCommand::new(
373 store.clone(),
374 )),
375 ])?;
376 match mode {
377 AppendMode::Direct => engine.add_commands(vec![Box::new(
378 commands::append_command::AppendCommand::new(store.clone()),
379 )]),
380 AppendMode::Buffered(output) => engine.add_commands(vec![Box::new(
381 commands::append_command_buffered::AppendCommand::new(store.clone(), output),
382 )]),
383 }
384}
385
386pub fn prepared_base(store: &Store, read: ReadMode, direct_write: bool) -> Result<Engine, Error> {
393 let mut engine = Engine::new()?;
394 add_core_commands(&mut engine, store)?;
395 engine.add_alias(".rm", ".remove")?;
396 add_read_commands(&mut engine, store, read)?;
397 if direct_write {
398 add_write_commands(&mut engine, store, AppendMode::Direct)?;
399 }
400 Ok(engine)
401}