1use std::path::Path;
2use std::sync::{atomic::AtomicBool, Arc};
3
4use tokio_util::sync::CancellationToken;
5
6use nu_cli::{add_cli_context, gather_parent_env_vars};
7use nu_cmd_lang::create_default_context;
8use nu_command::add_shell_command_context;
9use nu_engine::eval_block_with_early_return;
10use nu_parser::parse;
11use nu_plugin_engine::{GetPlugin, PluginDeclaration};
12use nu_protocol::engine::Command;
13use nu_protocol::format_cli_error;
14use nu_protocol::{
15 debugger::WithoutDebug,
16 engine::{Closure, EngineState, Redirection, Stack, StateWorkingSet},
17 shell_error::generic::GenericError,
18 OutDest, PipelineData, PluginIdentity, RegisteredPlugin, ShellError, Signals, Span, Type,
19 Value,
20};
21
22use crate::bus::Bus;
23use crate::commands::{
24 BusPubCommand, BusSubCommand, HighlightCommand, HighlightLangCommand, HighlightThemeCommand,
25 MdCommand, MjCommand, MjCompileCommand, MjRenderCommand, PrintCommand, ReverseProxyCommand,
26 RunNuCommand, StaticCommand, ToSse,
27};
28use crate::logging::log_error;
29use crate::stdlib::load_http_nu_stdlib;
30use crate::Error;
31
32#[derive(Clone, Default)]
34pub struct HttpNuOptions {
35 pub dev: bool,
36 pub datastar: bool,
37 pub watch: bool,
38 pub store: Option<String>,
39 pub topic: Option<String>,
40 pub expose: Option<String>,
41 pub tls: Option<String>,
42 pub services: bool,
43}
44
45#[derive(Clone)]
46pub struct Engine {
47 pub state: EngineState,
48 pub closure: Option<Closure>,
49 pub bus: Arc<Bus>,
51 pub sse_cancel_token: CancellationToken,
53}
54
55impl Engine {
56 pub fn new() -> Result<Self, Error> {
57 let mut engine_state = create_default_context();
58
59 engine_state = add_shell_command_context(engine_state);
60 engine_state = add_cli_context(engine_state);
61 engine_state = nu_cmd_extra::extra::add_extra_command_context(engine_state);
62
63 load_http_nu_stdlib(&mut engine_state)?;
64 nu_std::load_standard_library(&mut engine_state)?;
65
66 let init_cwd = std::env::current_dir()?;
67 gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
68
69 Ok(Self {
70 state: engine_state,
71 closure: None,
72 bus: Arc::new(Bus::new(64)),
73 sse_cancel_token: CancellationToken::new(),
74 })
75 }
76
77 pub fn set_http_nu_const(&mut self, options: &HttpNuOptions) -> Result<(), Error> {
79 let span = Span::unknown();
80 let opt_str = |v: &Option<String>| match v {
81 Some(s) => Value::string(s, span),
82 None => Value::nothing(span),
83 };
84 let record = Value::record(
85 nu_protocol::record! {
86 "dev" => Value::bool(options.dev, span),
87 "datastar" => Value::bool(options.datastar, span),
88 "watch" => Value::bool(options.watch, span),
89 "store" => opt_str(&options.store),
90 "topic" => opt_str(&options.topic),
91 "expose" => opt_str(&options.expose),
92 "tls" => opt_str(&options.tls),
93 "services" => Value::bool(options.services, span),
94 },
95 span,
96 );
97 let mut working_set = StateWorkingSet::new(&self.state);
98 let var_id = working_set.add_variable(b"$HTTP_NU".into(), span, Type::record(), false);
99 working_set.set_variable_const_val(var_id, record);
100 self.state.merge_delta(working_set.render())?;
101 Ok(())
102 }
103
104 pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
105 let mut working_set = StateWorkingSet::new(&self.state);
106 for command in commands {
107 working_set.add_decl(command);
108 }
109 self.state.merge_delta(working_set.render())?;
110 Ok(())
111 }
112
113 pub fn load_plugin(&mut self, path: &Path) -> Result<(), Error> {
115 let path = path.canonicalize().map_err(|e| {
117 Error::from(format!("Failed to canonicalize plugin path {path:?}: {e}"))
118 })?;
119
120 let identity = PluginIdentity::new(&path, None).map_err(|_| {
122 Error::from(format!(
123 "Invalid plugin path {path:?}: must be named nu_plugin_*"
124 ))
125 })?;
126
127 let mut working_set = StateWorkingSet::new(&self.state);
128
129 let plugin = nu_plugin_engine::add_plugin_to_working_set(&mut working_set, &identity)?;
131
132 self.state.merge_delta(working_set.render())?;
134
135 let interface = plugin.clone().get_plugin(None)?;
137
138 plugin.set_metadata(Some(interface.get_metadata()?));
140
141 let mut working_set = StateWorkingSet::new(&self.state);
143 for signature in interface.get_signature()? {
144 let decl = PluginDeclaration::new(plugin.clone(), signature);
145 working_set.add_decl(Box::new(decl));
146 }
147 self.state.merge_delta(working_set.render())?;
148
149 Ok(())
150 }
151
152 pub fn parse_closure(&mut self, script: &str, file: Option<&Path>) -> Result<(), Error> {
153 self.state.file = file.map(|p| p.to_path_buf());
154 let fname = file.map(|p| p.to_string_lossy().into_owned());
155 let mut working_set = StateWorkingSet::new(&self.state);
156 let block = parse(&mut working_set, fname.as_deref(), script.as_bytes(), false);
157
158 if let Some(err) = working_set.parse_errors.first() {
160 let shell_error = ShellError::Generic(GenericError::new(
161 "Parse error",
162 format!("{err:?}"),
163 err.span(),
164 ));
165 return Err(Error::from(format_cli_error(
166 None,
167 &working_set,
168 &shell_error,
169 None,
170 )));
171 }
172
173 if let Some(err) = working_set.compile_errors.first() {
175 let shell_error = ShellError::Generic(GenericError::new_internal(
176 format!("Compile error {err}"),
177 "",
178 ));
179 return Err(Error::from(format_cli_error(
180 None,
181 &working_set,
182 &shell_error,
183 None,
184 )));
185 }
186
187 self.state.merge_delta(working_set.render())?;
188
189 let mut stack = Stack::new();
190 let result = eval_block_with_early_return::<WithoutDebug>(
191 &self.state,
192 &mut stack,
193 &block,
194 PipelineData::empty(),
195 )
196 .map_err(|err| {
197 let working_set = StateWorkingSet::new(&self.state);
198 Error::from(format_cli_error(None, &working_set, &err, None))
199 })?;
200
201 let closure = result
202 .body
203 .into_value(Span::unknown())
204 .map_err(|err| {
205 let working_set = StateWorkingSet::new(&self.state);
206 Error::from(format_cli_error(None, &working_set, &err, None))
207 })?
208 .into_closure()
209 .map_err(|err| {
210 let working_set = StateWorkingSet::new(&self.state);
211 Error::from(format_cli_error(None, &working_set, &err, None))
212 })?;
213
214 let block = self.state.get_block(closure.block_id);
216 if block.signature.required_positional.len() != 1 {
217 return Err(format!(
218 "Closure must accept exactly one request argument, found {}",
219 block.signature.required_positional.len()
220 )
221 .into());
222 }
223
224 self.state.merge_env(&mut stack)?;
225
226 self.closure = Some(closure);
227 Ok(())
228 }
229
230 pub fn set_signals(&mut self, interrupt: Arc<AtomicBool>) {
232 self.state.set_signals(Signals::new(interrupt));
233 }
234
235 pub fn set_lib_dirs(&mut self, paths: &[std::path::PathBuf]) -> Result<(), Error> {
237 if paths.is_empty() {
238 return Ok(());
239 }
240 let span = Span::unknown();
241 let vals: Vec<Value> = paths
242 .iter()
243 .map(|p| Value::string(p.to_string_lossy(), span))
244 .collect();
245
246 let mut working_set = StateWorkingSet::new(&self.state);
247 let var_id = working_set.add_variable(
248 b"$NU_LIB_DIRS".into(),
249 span,
250 Type::List(Box::new(Type::String)),
251 false,
252 );
253 working_set.set_variable_const_val(var_id, Value::list(vals, span));
254 self.state.merge_delta(working_set.render())?;
255 Ok(())
256 }
257
258 pub fn eval(&mut self, script: &str, file: Option<&Path>) -> Result<Value, Error> {
260 self.state.file = file.map(|p| p.to_path_buf());
261 let fname = file.map(|p| p.to_string_lossy().into_owned());
262 let mut working_set = StateWorkingSet::new(&self.state);
263 let block = parse(&mut working_set, fname.as_deref(), script.as_bytes(), false);
264
265 if let Some(err) = working_set.parse_errors.first() {
266 let shell_error = ShellError::Generic(GenericError::new(
267 "Parse error",
268 format!("{err:?}"),
269 err.span(),
270 ));
271 return Err(Error::from(format_cli_error(
272 None,
273 &working_set,
274 &shell_error,
275 None,
276 )));
277 }
278
279 if let Some(err) = working_set.compile_errors.first() {
280 let shell_error = ShellError::Generic(GenericError::new_internal(
281 format!("Compile error {err}"),
282 "",
283 ));
284 return Err(Error::from(format_cli_error(
285 None,
286 &working_set,
287 &shell_error,
288 None,
289 )));
290 }
291
292 let mut engine_state = self.state.clone();
294 engine_state.merge_delta(working_set.render())?;
295
296 let mut stack = Stack::new();
297 let result = eval_block_with_early_return::<WithoutDebug>(
298 &engine_state,
299 &mut stack,
300 &block,
301 PipelineData::empty(),
302 )
303 .map_err(|err| {
304 let working_set = StateWorkingSet::new(&engine_state);
305 Error::from(format_cli_error(None, &working_set, &err, None))
306 })?;
307
308 result.body.into_value(Span::unknown()).map_err(|err| {
309 let working_set = StateWorkingSet::new(&engine_state);
310 Error::from(format_cli_error(None, &working_set, &err, None))
311 })
312 }
313
314 pub fn run_closure(
316 &self,
317 input: Value,
318 pipeline_data: PipelineData,
319 ) -> Result<PipelineData, Error> {
320 let closure = self.closure.as_ref().ok_or("Closure not parsed")?;
321
322 let mut stack = Stack::new().captures_to_stack(closure.captures.clone());
323 let mut stack =
324 stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
325 let block = self.state.get_block(closure.block_id);
326
327 stack.add_var(
328 block.signature.required_positional[0].var_id.unwrap(),
329 input,
330 );
331
332 eval_block_with_early_return::<WithoutDebug>(&self.state, &mut stack, block, pipeline_data)
333 .map(|exec_data| exec_data.body)
334 .map_err(|err| {
335 let working_set = StateWorkingSet::new(&self.state);
336 Error::from(format_cli_error(None, &working_set, &err, None))
337 })
338 }
339
340 pub fn add_custom_commands(&mut self) -> Result<(), Error> {
342 self.add_commands(vec![
343 Box::new(ReverseProxyCommand::new()),
344 Box::new(StaticCommand::new()),
345 Box::new(ToSse {}),
346 Box::new(MjCommand::new()),
347 Box::new(MjCompileCommand::new()),
348 Box::new(MjRenderCommand::new()),
349 Box::new(HighlightCommand::new()),
350 Box::new(HighlightThemeCommand::new()),
351 Box::new(HighlightLangCommand::new()),
352 Box::new(MdCommand::new()),
353 Box::new(PrintCommand::new()),
354 Box::new(RunNuCommand::new()),
355 Box::new(BusPubCommand::new(self.bus.clone())),
356 Box::new(BusSubCommand::new(self.bus.clone())),
357 ])
358 }
359
360 #[cfg(feature = "cross-stream")]
362 pub fn add_store_commands(&mut self, store: &xs::store::Store) -> Result<(), Error> {
363 let mut xe = xs::nu::Engine {
373 state: std::mem::replace(&mut self.state, EngineState::new()),
374 };
375 let res = xs::nu::add_core_commands(&mut xe, store)
376 .and_then(|()| xs::nu::add_read_commands(&mut xe, store, xs::nu::ReadMode::Stream))
377 .and_then(|()| {
378 xs::nu::add_write_commands(
379 &mut xe,
380 store,
381 xs::nu::AppendMode::Direct(serde_json::json!({})),
382 )
383 });
384 self.state = xe.state;
385 res.map_err(|e| Error::from(e.to_string()))
386 }
387
388 #[cfg(feature = "cross-stream")]
390 pub fn add_store_mj_commands(&mut self, store: &xs::store::Store) -> Result<(), Error> {
391 self.add_commands(vec![
392 Box::new(MjCommand::with_store(store.clone())),
393 Box::new(MjCompileCommand::with_store(store.clone())),
394 ])
395 }
396}
397
398pub fn script_to_engine(base: &Engine, script: &str, file: Option<&Path>) -> Option<Engine> {
401 let mut engine = base.clone();
402 engine.sse_cancel_token = CancellationToken::new();
404
405 if let Err(e) = engine.parse_closure(script, file) {
406 log_error(&nu_utils::strip_ansi_string_likely(e.to_string()));
407 return None;
408 }
409
410 Some(engine)
411}