Skip to main content

http_nu/
engine.rs

1use std::path::Path;
2use std::sync::{atomic::AtomicBool, Arc};
3
4use tokio_util::sync::CancellationToken;
5
6use nu_cli::{add_cli_context, gather_parent_env_vars};
7use nu_cmd_lang::create_default_context;
8use nu_command::add_shell_command_context;
9use nu_engine::eval_block_with_early_return;
10use nu_parser::parse;
11use nu_plugin_engine::{GetPlugin, PluginDeclaration};
12use nu_protocol::engine::Command;
13use nu_protocol::format_cli_error;
14use nu_protocol::{
15    debugger::WithoutDebug,
16    engine::{Closure, EngineState, Redirection, Stack, StateWorkingSet},
17    shell_error::generic::GenericError,
18    OutDest, PipelineData, PluginIdentity, RegisteredPlugin, ShellError, Signals, Span, Type,
19    Value,
20};
21
22use crate::bus::Bus;
23use crate::commands::{
24    BusPubCommand, BusSubCommand, HighlightCommand, HighlightLangCommand, HighlightThemeCommand,
25    MdCommand, MjCommand, MjCompileCommand, MjRenderCommand, PrintCommand, ReverseProxyCommand,
26    RunNuCommand, StaticCommand, ToSse,
27};
28use crate::logging::log_error;
29use crate::stdlib::load_http_nu_stdlib;
30use crate::Error;
31
32/// CLI options exposed to scripts as the `$HTTP_NU` const
33#[derive(Clone, Default)]
34pub struct HttpNuOptions {
35    pub dev: bool,
36    pub datastar: bool,
37    pub watch: bool,
38    pub store: Option<String>,
39    pub topic: Option<String>,
40    pub expose: Option<String>,
41    pub tls: Option<String>,
42    pub services: bool,
43}
44
45#[derive(Clone)]
46pub struct Engine {
47    pub state: EngineState,
48    pub closure: Option<Closure>,
49    /// Local in-process pub/sub bus for ephemeral UI events
50    pub bus: Arc<Bus>,
51    /// Cancellation token for SSE streams
52    pub sse_cancel_token: CancellationToken,
53}
54
55impl Engine {
56    pub fn new() -> Result<Self, Error> {
57        let mut engine_state = create_default_context();
58
59        engine_state = add_shell_command_context(engine_state);
60        engine_state = add_cli_context(engine_state);
61        engine_state = nu_cmd_extra::extra::add_extra_command_context(engine_state);
62
63        load_http_nu_stdlib(&mut engine_state)?;
64        nu_std::load_standard_library(&mut engine_state)?;
65
66        let init_cwd = std::env::current_dir()?;
67        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
68
69        Ok(Self {
70            state: engine_state,
71            closure: None,
72            bus: Arc::new(Bus::new(64)),
73            sse_cancel_token: CancellationToken::new(),
74        })
75    }
76
77    /// Sets `$HTTP_NU` const with server configuration for stdlib modules
78    pub fn set_http_nu_const(&mut self, options: &HttpNuOptions) -> Result<(), Error> {
79        let span = Span::unknown();
80        let opt_str = |v: &Option<String>| match v {
81            Some(s) => Value::string(s, span),
82            None => Value::nothing(span),
83        };
84        let record = Value::record(
85            nu_protocol::record! {
86                "dev" => Value::bool(options.dev, span),
87                "datastar" => Value::bool(options.datastar, span),
88                "watch" => Value::bool(options.watch, span),
89                "store" => opt_str(&options.store),
90                "topic" => opt_str(&options.topic),
91                "expose" => opt_str(&options.expose),
92                "tls" => opt_str(&options.tls),
93                "services" => Value::bool(options.services, span),
94            },
95            span,
96        );
97        let mut working_set = StateWorkingSet::new(&self.state);
98        let var_id = working_set.add_variable(b"$HTTP_NU".into(), span, Type::record(), false);
99        working_set.set_variable_const_val(var_id, record);
100        self.state.merge_delta(working_set.render())?;
101        Ok(())
102    }
103
104    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
105        let mut working_set = StateWorkingSet::new(&self.state);
106        for command in commands {
107            working_set.add_decl(command);
108        }
109        self.state.merge_delta(working_set.render())?;
110        Ok(())
111    }
112
113    /// Load a Nushell plugin from the given path
114    pub fn load_plugin(&mut self, path: &Path) -> Result<(), Error> {
115        // Canonicalize the path
116        let path = path.canonicalize().map_err(|e| {
117            Error::from(format!("Failed to canonicalize plugin path {path:?}: {e}"))
118        })?;
119
120        // Create the plugin identity
121        let identity = PluginIdentity::new(&path, None).map_err(|_| {
122            Error::from(format!(
123                "Invalid plugin path {path:?}: must be named nu_plugin_*"
124            ))
125        })?;
126
127        let mut working_set = StateWorkingSet::new(&self.state);
128
129        // Add plugin to working set and get handle
130        let plugin = nu_plugin_engine::add_plugin_to_working_set(&mut working_set, &identity)?;
131
132        // Merge working set to make plugin available
133        self.state.merge_delta(working_set.render())?;
134
135        // Spawn the plugin to get its signatures
136        let interface = plugin.clone().get_plugin(None)?;
137
138        // Set plugin metadata
139        plugin.set_metadata(Some(interface.get_metadata()?));
140
141        // Add command declarations from plugin signatures
142        let mut working_set = StateWorkingSet::new(&self.state);
143        for signature in interface.get_signature()? {
144            let decl = PluginDeclaration::new(plugin.clone(), signature);
145            working_set.add_decl(Box::new(decl));
146        }
147        self.state.merge_delta(working_set.render())?;
148
149        Ok(())
150    }
151
152    pub fn parse_closure(&mut self, script: &str, file: Option<&Path>) -> Result<(), Error> {
153        self.state.file = file.map(|p| p.to_path_buf());
154        let fname = file.map(|p| p.to_string_lossy().into_owned());
155        let mut working_set = StateWorkingSet::new(&self.state);
156        let block = parse(&mut working_set, fname.as_deref(), script.as_bytes(), false);
157
158        // Handle parse errors
159        if let Some(err) = working_set.parse_errors.first() {
160            let shell_error = ShellError::Generic(GenericError::new(
161                "Parse error",
162                format!("{err:?}"),
163                err.span(),
164            ));
165            return Err(Error::from(format_cli_error(
166                None,
167                &working_set,
168                &shell_error,
169                None,
170            )));
171        }
172
173        // Handle compile errors
174        if let Some(err) = working_set.compile_errors.first() {
175            let shell_error = ShellError::Generic(GenericError::new_internal(
176                format!("Compile error {err}"),
177                "",
178            ));
179            return Err(Error::from(format_cli_error(
180                None,
181                &working_set,
182                &shell_error,
183                None,
184            )));
185        }
186
187        self.state.merge_delta(working_set.render())?;
188
189        let mut stack = Stack::new();
190        let result = eval_block_with_early_return::<WithoutDebug>(
191            &self.state,
192            &mut stack,
193            &block,
194            PipelineData::empty(),
195        )
196        .map_err(|err| {
197            let working_set = StateWorkingSet::new(&self.state);
198            Error::from(format_cli_error(None, &working_set, &err, None))
199        })?;
200
201        let closure = result
202            .body
203            .into_value(Span::unknown())
204            .map_err(|err| {
205                let working_set = StateWorkingSet::new(&self.state);
206                Error::from(format_cli_error(None, &working_set, &err, None))
207            })?
208            .into_closure()
209            .map_err(|err| {
210                let working_set = StateWorkingSet::new(&self.state);
211                Error::from(format_cli_error(None, &working_set, &err, None))
212            })?;
213
214        // Verify closure accepts exactly one argument
215        let block = self.state.get_block(closure.block_id);
216        if block.signature.required_positional.len() != 1 {
217            return Err(format!(
218                "Closure must accept exactly one request argument, found {}",
219                block.signature.required_positional.len()
220            )
221            .into());
222        }
223
224        self.state.merge_env(&mut stack)?;
225
226        self.closure = Some(closure);
227        Ok(())
228    }
229
230    /// Sets the interrupt signal for the engine
231    pub fn set_signals(&mut self, interrupt: Arc<AtomicBool>) {
232        self.state.set_signals(Signals::new(interrupt));
233    }
234
235    /// Sets NU_LIB_DIRS const for module resolution
236    pub fn set_lib_dirs(&mut self, paths: &[std::path::PathBuf]) -> Result<(), Error> {
237        if paths.is_empty() {
238            return Ok(());
239        }
240        let span = Span::unknown();
241        let vals: Vec<Value> = paths
242            .iter()
243            .map(|p| Value::string(p.to_string_lossy(), span))
244            .collect();
245
246        let mut working_set = StateWorkingSet::new(&self.state);
247        let var_id = working_set.add_variable(
248            b"$NU_LIB_DIRS".into(),
249            span,
250            Type::List(Box::new(Type::String)),
251            false,
252        );
253        working_set.set_variable_const_val(var_id, Value::list(vals, span));
254        self.state.merge_delta(working_set.render())?;
255        Ok(())
256    }
257
258    /// Evaluate a script string and return the result value
259    pub fn eval(&mut self, script: &str, file: Option<&Path>) -> Result<Value, Error> {
260        self.state.file = file.map(|p| p.to_path_buf());
261        let fname = file.map(|p| p.to_string_lossy().into_owned());
262        let mut working_set = StateWorkingSet::new(&self.state);
263        let block = parse(&mut working_set, fname.as_deref(), script.as_bytes(), false);
264
265        if let Some(err) = working_set.parse_errors.first() {
266            let shell_error = ShellError::Generic(GenericError::new(
267                "Parse error",
268                format!("{err:?}"),
269                err.span(),
270            ));
271            return Err(Error::from(format_cli_error(
272                None,
273                &working_set,
274                &shell_error,
275                None,
276            )));
277        }
278
279        if let Some(err) = working_set.compile_errors.first() {
280            let shell_error = ShellError::Generic(GenericError::new_internal(
281                format!("Compile error {err}"),
282                "",
283            ));
284            return Err(Error::from(format_cli_error(
285                None,
286                &working_set,
287                &shell_error,
288                None,
289            )));
290        }
291
292        // Clone engine state and merge the parsed block
293        let mut engine_state = self.state.clone();
294        engine_state.merge_delta(working_set.render())?;
295
296        let mut stack = Stack::new();
297        let result = eval_block_with_early_return::<WithoutDebug>(
298            &engine_state,
299            &mut stack,
300            &block,
301            PipelineData::empty(),
302        )
303        .map_err(|err| {
304            let working_set = StateWorkingSet::new(&engine_state);
305            Error::from(format_cli_error(None, &working_set, &err, None))
306        })?;
307
308        result.body.into_value(Span::unknown()).map_err(|err| {
309            let working_set = StateWorkingSet::new(&engine_state);
310            Error::from(format_cli_error(None, &working_set, &err, None))
311        })
312    }
313
314    /// Run the parsed closure with input value and pipeline data
315    pub fn run_closure(
316        &self,
317        input: Value,
318        pipeline_data: PipelineData,
319    ) -> Result<PipelineData, Error> {
320        let closure = self.closure.as_ref().ok_or("Closure not parsed")?;
321
322        let mut stack = Stack::new().captures_to_stack(closure.captures.clone());
323        let mut stack =
324            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);
325        let block = self.state.get_block(closure.block_id);
326
327        stack.add_var(
328            block.signature.required_positional[0].var_id.unwrap(),
329            input,
330        );
331
332        eval_block_with_early_return::<WithoutDebug>(&self.state, &mut stack, block, pipeline_data)
333            .map(|exec_data| exec_data.body)
334            .map_err(|err| {
335                let working_set = StateWorkingSet::new(&self.state);
336                Error::from(format_cli_error(None, &working_set, &err, None))
337            })
338    }
339
340    /// Adds http-nu custom commands to the engine
341    pub fn add_custom_commands(&mut self) -> Result<(), Error> {
342        self.add_commands(vec![
343            Box::new(ReverseProxyCommand::new()),
344            Box::new(StaticCommand::new()),
345            Box::new(ToSse {}),
346            Box::new(MjCommand::new()),
347            Box::new(MjCompileCommand::new()),
348            Box::new(MjRenderCommand::new()),
349            Box::new(HighlightCommand::new()),
350            Box::new(HighlightThemeCommand::new()),
351            Box::new(HighlightLangCommand::new()),
352            Box::new(MdCommand::new()),
353            Box::new(PrintCommand::new()),
354            Box::new(RunNuCommand::new()),
355            Box::new(BusPubCommand::new(self.bus.clone())),
356            Box::new(BusSubCommand::new(self.bus.clone())),
357        ])
358    }
359
360    /// Adds cross.stream store commands (.cat, .append, .cas, .last) to the engine
361    #[cfg(feature = "cross-stream")]
362    pub fn add_store_commands(&mut self, store: &xs::store::Store) -> Result<(), Error> {
363        // Delegate to cross.stream's canonical command surface (core + read +
364        // write) rather than maintaining our own list. That parallel list is
365        // exactly what silently fell behind when xs added `.import` /
366        // `.cas-post`; routing through xs's helpers keeps http-nu's eval and
367        // handlers in lockstep with the store's own command set. We expose the
368        // streaming readers (`--follow`) and a direct, untagged `.append`.
369        //
370        // The helpers take `xs::nu::Engine`, a newtype over EngineState, so
371        // borrow our state into one for the duration and move it back.
372        let mut xe = xs::nu::Engine {
373            state: std::mem::replace(&mut self.state, EngineState::new()),
374        };
375        let res = xs::nu::add_core_commands(&mut xe, store)
376            .and_then(|()| xs::nu::add_read_commands(&mut xe, store, xs::nu::ReadMode::Stream))
377            .and_then(|()| {
378                xs::nu::add_write_commands(&mut xe, store, xs::nu::AppendMode::Direct)
379            });
380        self.state = xe.state;
381        res.map_err(|e| Error::from(e.to_string()))
382    }
383
384    /// Re-registers .mj commands with store access for stream-backed template loading
385    #[cfg(feature = "cross-stream")]
386    pub fn add_store_mj_commands(&mut self, store: &xs::store::Store) -> Result<(), Error> {
387        self.add_commands(vec![
388            Box::new(MjCommand::with_store(store.clone())),
389            Box::new(MjCompileCommand::with_store(store.clone())),
390        ])
391    }
392}
393
394/// Creates an engine from a script by cloning a base engine and parsing the closure.
395/// On error, prints to stderr and emits JSON to stdout, returning None.
396pub fn script_to_engine(base: &Engine, script: &str, file: Option<&Path>) -> Option<Engine> {
397    let mut engine = base.clone();
398    // Fresh cancellation token for this engine instance
399    engine.sse_cancel_token = CancellationToken::new();
400
401    if let Err(e) = engine.parse_closure(script, file) {
402        log_error(&nu_utils::strip_ansi_string_likely(e.to_string()));
403        return None;
404    }
405
406    Some(engine)
407}