Skip to main content

http_nu/
engine.rs

1use std::path::Path;
2use std::sync::{atomic::AtomicBool, Arc};
3
4use tokio_util::sync::CancellationToken;
5
6use nu_cli::{add_cli_context, gather_parent_env_vars};
7use nu_cmd_lang::create_default_context;
8use nu_command::add_shell_command_context;
9use nu_engine::eval_block_with_early_return;
10use nu_parser::parse;
11use nu_plugin_engine::{GetPlugin, PluginDeclaration};
12use nu_protocol::engine::Command;
13use nu_protocol::format_cli_error;
14use nu_protocol::{
15    debugger::WithoutDebug,
16    engine::{Closure, EngineState, Redirection, Stack, StateWorkingSet},
17    shell_error::generic::GenericError,
18    OutDest, PipelineData, PluginIdentity, RegisteredPlugin, ShellError, Signals, Span, Type,
19    Value,
20};
21
22use crate::commands::{
23    HighlightCommand, HighlightLangCommand, HighlightThemeCommand, MdCommand, MjCommand,
24    MjCompileCommand, MjRenderCommand, PrintCommand, ReverseProxyCommand, StaticCommand, ToSse,
25};
26use crate::logging::log_error;
27use crate::stdlib::load_http_nu_stdlib;
28use crate::Error;
29
30/// CLI options exposed to scripts as the `$HTTP_NU` const
31#[derive(Clone, Default)]
32pub struct HttpNuOptions {
33    pub dev: bool,
34    pub datastar: bool,
35    pub watch: bool,
36    pub store: Option<String>,
37    pub topic: Option<String>,
38    pub expose: Option<String>,
39    pub tls: Option<String>,
40    pub services: bool,
41}
42
43#[derive(Clone)]
44pub struct Engine {
45    pub state: EngineState,
46    pub closure: Option<Closure>,
47    /// Cancellation token for SSE streams
48    pub sse_cancel_token: CancellationToken,
49}
50
51impl Engine {
52    pub fn new() -> Result<Self, Error> {
53        let mut engine_state = create_default_context();
54
55        engine_state = add_shell_command_context(engine_state);
56        engine_state = add_cli_context(engine_state);
57        engine_state = nu_cmd_extra::extra::add_extra_command_context(engine_state);
58
59        load_http_nu_stdlib(&mut engine_state)?;
60        nu_std::load_standard_library(&mut engine_state)?;
61
62        let init_cwd = std::env::current_dir()?;
63        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
64
65        Ok(Self {
66            state: engine_state,
67            closure: None,
68            sse_cancel_token: CancellationToken::new(),
69        })
70    }
71
72    /// Sets `$HTTP_NU` const with server configuration for stdlib modules
73    pub fn set_http_nu_const(&mut self, options: &HttpNuOptions) -> Result<(), Error> {
74        let span = Span::unknown();
75        let opt_str = |v: &Option<String>| match v {
76            Some(s) => Value::string(s, span),
77            None => Value::nothing(span),
78        };
79        let record = Value::record(
80            nu_protocol::record! {
81                "dev" => Value::bool(options.dev, span),
82                "datastar" => Value::bool(options.datastar, span),
83                "watch" => Value::bool(options.watch, span),
84                "store" => opt_str(&options.store),
85                "topic" => opt_str(&options.topic),
86                "expose" => opt_str(&options.expose),
87                "tls" => opt_str(&options.tls),
88                "services" => Value::bool(options.services, span),
89            },
90            span,
91        );
92        let mut working_set = StateWorkingSet::new(&self.state);
93        let var_id = working_set.add_variable(b"$HTTP_NU".into(), span, Type::record(), false);
94        working_set.set_variable_const_val(var_id, record);
95        self.state.merge_delta(working_set.render())?;
96        Ok(())
97    }
98
99    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
100        let mut working_set = StateWorkingSet::new(&self.state);
101        for command in commands {
102            working_set.add_decl(command);
103        }
104        self.state.merge_delta(working_set.render())?;
105        Ok(())
106    }
107
108    /// Load a Nushell plugin from the given path
109    pub fn load_plugin(&mut self, path: &Path) -> Result<(), Error> {
110        // Canonicalize the path
111        let path = path.canonicalize().map_err(|e| {
112            Error::from(format!("Failed to canonicalize plugin path {path:?}: {e}"))
113        })?;
114
115        // Create the plugin identity
116        let identity = PluginIdentity::new(&path, None).map_err(|_| {
117            Error::from(format!(
118                "Invalid plugin path {path:?}: must be named nu_plugin_*"
119            ))
120        })?;
121
122        let mut working_set = StateWorkingSet::new(&self.state);
123
124        // Add plugin to working set and get handle
125        let plugin = nu_plugin_engine::add_plugin_to_working_set(&mut working_set, &identity)?;
126
127        // Merge working set to make plugin available
128        self.state.merge_delta(working_set.render())?;
129
130        // Spawn the plugin to get its signatures
131        let interface = plugin.clone().get_plugin(None)?;
132
133        // Set plugin metadata
134        plugin.set_metadata(Some(interface.get_metadata()?));
135
136        // Add command declarations from plugin signatures
137        let mut working_set = StateWorkingSet::new(&self.state);
138        for signature in interface.get_signature()? {
139            let decl = PluginDeclaration::new(plugin.clone(), signature);
140            working_set.add_decl(Box::new(decl));
141        }
142        self.state.merge_delta(working_set.render())?;
143
144        Ok(())
145    }
146
147    pub fn parse_closure(&mut self, script: &str, file: Option<&Path>) -> Result<(), Error> {
148        self.state.file = file.map(|p| p.to_path_buf());
149        let fname = file.map(|p| p.to_string_lossy().into_owned());
150        let mut working_set = StateWorkingSet::new(&self.state);
151        let block = parse(&mut working_set, fname.as_deref(), script.as_bytes(), false);
152
153        // Handle parse errors
154        if let Some(err) = working_set.parse_errors.first() {
155            let shell_error = ShellError::Generic(GenericError::new(
156                "Parse error",
157                format!("{err:?}"),
158                err.span(),
159            ));
160            return Err(Error::from(format_cli_error(
161                None,
162                &working_set,
163                &shell_error,
164                None,
165            )));
166        }
167
168        // Handle compile errors
169        if let Some(err) = working_set.compile_errors.first() {
170            let shell_error = ShellError::Generic(GenericError::new_internal(
171                format!("Compile error {err}"),
172                "",
173            ));
174            return Err(Error::from(format_cli_error(
175                None,
176                &working_set,
177                &shell_error,
178                None,
179            )));
180        }
181
182        self.state.merge_delta(working_set.render())?;
183
184        let mut stack = Stack::new();
185        let result = eval_block_with_early_return::<WithoutDebug>(
186            &self.state,
187            &mut stack,
188            &block,
189            PipelineData::empty(),
190        )
191        .map_err(|err| {
192            let working_set = StateWorkingSet::new(&self.state);
193            Error::from(format_cli_error(None, &working_set, &err, None))
194        })?;
195
196        let closure = result
197            .body
198            .into_value(Span::unknown())
199            .map_err(|err| {
200                let working_set = StateWorkingSet::new(&self.state);
201                Error::from(format_cli_error(None, &working_set, &err, None))
202            })?
203            .into_closure()
204            .map_err(|err| {
205                let working_set = StateWorkingSet::new(&self.state);
206                Error::from(format_cli_error(None, &working_set, &err, None))
207            })?;
208
209        // Verify closure accepts exactly one argument
210        let block = self.state.get_block(closure.block_id);
211        if block.signature.required_positional.len() != 1 {
212            return Err(format!(
213                "Closure must accept exactly one request argument, found {}",
214                block.signature.required_positional.len()
215            )
216            .into());
217        }
218
219        self.state.merge_env(&mut stack)?;
220
221        self.closure = Some(closure);
222        Ok(())
223    }
224
225    /// Sets the interrupt signal for the engine
226    pub fn set_signals(&mut self, interrupt: Arc<AtomicBool>) {
227        self.state.set_signals(Signals::new(interrupt));
228    }
229
230    /// Sets NU_LIB_DIRS const for module resolution
231    pub fn set_lib_dirs(&mut self, paths: &[std::path::PathBuf]) -> Result<(), Error> {
232        if paths.is_empty() {
233            return Ok(());
234        }
235        let span = Span::unknown();
236        let vals: Vec<Value> = paths
237            .iter()
238            .map(|p| Value::string(p.to_string_lossy(), span))
239            .collect();
240
241        let mut working_set = StateWorkingSet::new(&self.state);
242        let var_id = working_set.add_variable(
243            b"$NU_LIB_DIRS".into(),
244            span,
245            Type::List(Box::new(Type::String)),
246            false,
247        );
248        working_set.set_variable_const_val(var_id, Value::list(vals, span));
249        self.state.merge_delta(working_set.render())?;
250        Ok(())
251    }
252
253    /// Evaluate a script string and return the result value
254    pub fn eval(&mut self, script: &str, file: Option<&Path>) -> Result<Value, Error> {
255        self.state.file = file.map(|p| p.to_path_buf());
256        let fname = file.map(|p| p.to_string_lossy().into_owned());
257        let mut working_set = StateWorkingSet::new(&self.state);
258        let block = parse(&mut working_set, fname.as_deref(), script.as_bytes(), false);
259
260        if let Some(err) = working_set.parse_errors.first() {
261            let shell_error = ShellError::Generic(GenericError::new(
262                "Parse error",
263                format!("{err:?}"),
264                err.span(),
265            ));
266            return Err(Error::from(format_cli_error(
267                None,
268                &working_set,
269                &shell_error,
270                None,
271            )));
272        }
273
274        if let Some(err) = working_set.compile_errors.first() {
275            let shell_error = ShellError::Generic(GenericError::new_internal(
276                format!("Compile error {err}"),
277                "",
278            ));
279            return Err(Error::from(format_cli_error(
280                None,
281                &working_set,
282                &shell_error,
283                None,
284            )));
285        }
286
287        // Clone engine state and merge the parsed block
288        let mut engine_state = self.state.clone();
289        engine_state.merge_delta(working_set.render())?;
290
291        let mut stack = Stack::new();
292        let result = eval_block_with_early_return::<WithoutDebug>(
293            &engine_state,
294            &mut stack,
295            &block,
296            PipelineData::empty(),
297        )
298        .map_err(|err| {
299            let working_set = StateWorkingSet::new(&engine_state);
300            Error::from(format_cli_error(None, &working_set, &err, None))
301        })?;
302
303        result.body.into_value(Span::unknown()).map_err(|err| {
304            let working_set = StateWorkingSet::new(&engine_state);
305            Error::from(format_cli_error(None, &working_set, &err, None))
306        })
307    }
308
309    /// Run the parsed closure with input value and pipeline data
310    pub fn run_closure(
311        &self,
312        input: Value,
313        pipeline_data: PipelineData,
314    ) -> Result<PipelineData, Error> {
315        let closure = self.closure.as_ref().ok_or("Closure not parsed")?;
316
317        let mut stack = Stack::new().captures_to_stack(closure.captures.clone());
318        let mut stack =
319            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
320        let block = self.state.get_block(closure.block_id);
321
322        stack.add_var(
323            block.signature.required_positional[0].var_id.unwrap(),
324            input,
325        );
326
327        eval_block_with_early_return::<WithoutDebug>(&self.state, &mut stack, block, pipeline_data)
328            .map(|exec_data| exec_data.body)
329            .map_err(|err| {
330                let working_set = StateWorkingSet::new(&self.state);
331                Error::from(format_cli_error(None, &working_set, &err, None))
332            })
333    }
334
335    /// Adds http-nu custom commands to the engine
336    pub fn add_custom_commands(&mut self) -> Result<(), Error> {
337        self.add_commands(vec![
338            Box::new(ReverseProxyCommand::new()),
339            Box::new(StaticCommand::new()),
340            Box::new(ToSse {}),
341            Box::new(MjCommand::new()),
342            Box::new(MjCompileCommand::new()),
343            Box::new(MjRenderCommand::new()),
344            Box::new(HighlightCommand::new()),
345            Box::new(HighlightThemeCommand::new()),
346            Box::new(HighlightLangCommand::new()),
347            Box::new(MdCommand::new()),
348            Box::new(PrintCommand::new()),
349        ])
350    }
351
352    /// Adds cross.stream store commands (.cat, .append, .cas, .last) to the engine
353    #[cfg(feature = "cross-stream")]
354    pub fn add_store_commands(&mut self, store: &xs::store::Store) -> Result<(), Error> {
355        self.add_commands(vec![
356            Box::new(xs::nu::commands::cat_stream_command::CatStreamCommand::new(
357                store.clone(),
358            )),
359            Box::new(xs::nu::commands::append_command::AppendCommand::new(
360                store.clone(),
361                serde_json::json!({}),
362            )),
363            Box::new(xs::nu::commands::cas_command::CasCommand::new(
364                store.clone(),
365            )),
366            Box::new(xs::nu::commands::last_stream_command::LastStreamCommand::new(store.clone())),
367            Box::new(xs::nu::commands::get_command::GetCommand::new(
368                store.clone(),
369            )),
370            Box::new(xs::nu::commands::remove_command::RemoveCommand::new(
371                store.clone(),
372            )),
373            Box::new(xs::nu::commands::scru128_command::Scru128Command::new()),
374        ])
375    }
376
377    /// Re-registers .mj commands with store access for stream-backed template loading
378    #[cfg(feature = "cross-stream")]
379    pub fn add_store_mj_commands(&mut self, store: &xs::store::Store) -> Result<(), Error> {
380        self.add_commands(vec![
381            Box::new(MjCommand::with_store(store.clone())),
382            Box::new(MjCompileCommand::with_store(store.clone())),
383        ])
384    }
385}
386
387/// Creates an engine from a script by cloning a base engine and parsing the closure.
388/// On error, prints to stderr and emits JSON to stdout, returning None.
389pub fn script_to_engine(base: &Engine, script: &str, file: Option<&Path>) -> Option<Engine> {
390    let mut engine = base.clone();
391    // Fresh cancellation token for this engine instance
392    engine.sse_cancel_token = CancellationToken::new();
393
394    if let Err(e) = engine.parse_closure(script, file) {
395        log_error(&nu_utils::strip_ansi_string_likely(e.to_string()));
396        return None;
397    }
398
399    Some(engine)
400}