Skip to main content

http_nu/
commands.rs

1use crate::bus::Bus;
2use crate::logging::log_print;
3use crate::response::{Response, ResponseBodyType};
4use nu_engine::command_prelude::*;
5use nu_protocol::{
6    shell_error::generic::GenericError, ByteStream, ByteStreamType, Category, Config, CustomValue,
7    PipelineData, PipelineMetadata, ShellError, Signature, Span, SyntaxShape, Type, Value,
8};
9use serde::{Deserialize, Serialize};
10use std::cell::RefCell;
11use std::collections::HashMap;
12use std::io::Read;
13use std::path::PathBuf;
14use tokio::sync::oneshot;
15
16use minijinja::{path_loader, AutoEscape, Environment};
17use std::sync::{Arc, OnceLock, RwLock};
18
19use syntect::html::{ClassStyle, ClassedHTMLGenerator};
20use syntect::parsing::SyntaxSet;
21use syntect::util::LinesWithEndings;
22
23// === Template Cache ===
24
25type TemplateCache = RwLock<HashMap<u128, Arc<Environment<'static>>>>;
26
27static TEMPLATE_CACHE: OnceLock<TemplateCache> = OnceLock::new();
28
29fn get_cache() -> &'static TemplateCache {
30    TEMPLATE_CACHE.get_or_init(|| RwLock::new(HashMap::new()))
31}
32
33fn hash_source_and_path(source: &str, base_dir: &std::path::Path) -> u128 {
34    let mut data = source.as_bytes().to_vec();
35    data.extend_from_slice(base_dir.to_string_lossy().as_bytes());
36    xxhash_rust::xxh3::xxh3_128(&data)
37}
38
39/// Compile template and insert into cache. Returns hash.
40fn compile_template(source: &str, base_dir: &std::path::Path) -> Result<u128, minijinja::Error> {
41    compile_template_with_loader(source, base_dir, path_loader(base_dir))
42}
43
44/// Compile template with a custom loader and insert into cache. Returns hash.
45fn compile_template_with_loader<F>(
46    source: &str,
47    base_dir: &std::path::Path,
48    loader: F,
49) -> Result<u128, minijinja::Error>
50where
51    F: Fn(&str) -> Result<Option<String>, minijinja::Error> + Send + Sync + 'static,
52{
53    let hash = hash_source_and_path(source, base_dir);
54
55    let mut cache = get_cache().write().unwrap();
56    if cache.contains_key(&hash) {
57        return Ok(hash);
58    }
59
60    let mut env = Environment::new();
61    env.set_auto_escape_callback(|_| AutoEscape::Html);
62    env.set_loader(loader);
63    env.add_template_owned("template".to_string(), source.to_string())?;
64    cache.insert(hash, Arc::new(env));
65    Ok(hash)
66}
67
68/// Get compiled template from cache by hash.
69fn get_compiled(hash: u128) -> Option<Arc<Environment<'static>>> {
70    get_cache().read().unwrap().get(&hash).map(Arc::clone)
71}
72
73/// Load the latest content for a store topic.
74#[cfg(feature = "cross-stream")]
75fn load_topic_content(store: &xs::store::Store, topic: &str) -> Option<String> {
76    let options = xs::store::ReadOptions::builder()
77        .follow(xs::store::FollowOption::Off)
78        .topic(topic.to_string())
79        .last(1_usize)
80        .build();
81    let frame = store.read_sync(options).last()?;
82    let hash = frame.hash?;
83    let bytes = store.cas_read_sync(&hash).ok()?;
84    String::from_utf8(bytes).ok()
85}
86
87// === CompiledTemplate CustomValue ===
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct CompiledTemplate {
91    hash: u128,
92}
93
94impl CompiledTemplate {
95    /// Render this template with the given context
96    pub fn render(&self, context: &minijinja::Value) -> Result<String, minijinja::Error> {
97        let env = get_compiled(self.hash).expect("template not in cache");
98        let tmpl = env.get_template("template")?;
99        tmpl.render(context)
100    }
101}
102
103#[typetag::serde]
104impl CustomValue for CompiledTemplate {
105    fn clone_value(&self, span: Span) -> Value {
106        Value::custom(Box::new(self.clone()), span)
107    }
108
109    fn type_name(&self) -> String {
110        "CompiledTemplate".into()
111    }
112
113    fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
114        Ok(Value::string(format!("{:032x}", self.hash), span))
115    }
116
117    fn as_any(&self) -> &dyn std::any::Any {
118        self
119    }
120
121    fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
122        self
123    }
124}
125
126thread_local! {
127    pub static RESPONSE_TX: RefCell<Option<oneshot::Sender<Response>>> = const { RefCell::new(None) };
128}
129
130#[derive(Clone)]
131pub struct StaticCommand;
132
133impl Default for StaticCommand {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl StaticCommand {
140    pub fn new() -> Self {
141        Self
142    }
143}
144
145impl Command for StaticCommand {
146    fn name(&self) -> &str {
147        ".static"
148    }
149
150    fn description(&self) -> &str {
151        "Serve static files from a directory"
152    }
153
154    fn signature(&self) -> Signature {
155        Signature::build(".static")
156            .required("root", SyntaxShape::String, "root directory path")
157            .required("path", SyntaxShape::String, "request path")
158            .named(
159                "fallback",
160                SyntaxShape::String,
161                "fallback file when request missing",
162                None,
163            )
164            .input_output_types(vec![(Type::Nothing, Type::Nothing)])
165            .category(Category::Custom("http".into()))
166    }
167
168    fn run(
169        &self,
170        engine_state: &EngineState,
171        stack: &mut Stack,
172        call: &Call,
173        _input: PipelineData,
174    ) -> Result<PipelineData, ShellError> {
175        let root: String = call.req(engine_state, stack, 0)?;
176        let path: String = call.req(engine_state, stack, 1)?;
177
178        let fallback: Option<String> = call.get_flag(engine_state, stack, "fallback")?;
179
180        let response = Response {
181            status: 200,
182            headers: HashMap::new(),
183            body_type: ResponseBodyType::Static {
184                root: PathBuf::from(root),
185                path,
186                fallback,
187            },
188        };
189
190        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
191            if let Some(tx) = tx.borrow_mut().take() {
192                tx.send(response).map_err(|_| {
193                    ShellError::Generic(GenericError::new(
194                        "Failed to send response",
195                        "Channel closed",
196                        call.head,
197                    ))
198                })?;
199            }
200            Ok(())
201        })?;
202
203        Ok(PipelineData::Empty)
204    }
205}
206
207const LINE_ENDING: &str = "\n";
208
209#[derive(Clone)]
210pub struct ToSse;
211
212impl Command for ToSse {
213    fn name(&self) -> &str {
214        "to sse"
215    }
216
217    fn signature(&self) -> Signature {
218        Signature::build("to sse")
219            .input_output_types(vec![
220                (Type::record(), Type::String),
221                (Type::List(Box::new(Type::record())), Type::String),
222            ])
223            .category(Category::Formats)
224    }
225
226    fn description(&self) -> &str {
227        "Convert records into text/event-stream format"
228    }
229
230    fn search_terms(&self) -> Vec<&str> {
231        vec!["sse", "server", "event"]
232    }
233
234    fn examples(&self) -> Vec<Example<'_>> {
235        vec![Example {
236            description: "Convert a record into a server-sent event",
237            example: "{data: 'hello'} | to sse",
238            result: Some(Value::test_string("data: hello\n\n")),
239        }]
240    }
241
242    fn run(
243        &self,
244        engine_state: &EngineState,
245        stack: &mut Stack,
246        call: &Call,
247        input: PipelineData,
248    ) -> Result<PipelineData, ShellError> {
249        let head = call.head;
250        let config = stack.get_config(engine_state);
251        match input {
252            PipelineData::ListStream(stream, meta) => {
253                let span = stream.span();
254                let cfg = config.clone();
255                let iter = stream
256                    .into_iter()
257                    .map(move |val| event_to_string(&cfg, val));
258                let stream = ByteStream::from_result_iter(
259                    iter,
260                    span,
261                    engine_state.signals().clone(),
262                    ByteStreamType::String,
263                );
264                Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
265            }
266            PipelineData::Value(Value::List { vals, .. }, meta) => {
267                let cfg = config.clone();
268                let iter = vals.into_iter().map(move |val| event_to_string(&cfg, val));
269                let span = head;
270                let stream = ByteStream::from_result_iter(
271                    iter,
272                    span,
273                    engine_state.signals().clone(),
274                    ByteStreamType::String,
275                );
276                Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
277            }
278            PipelineData::Value(val, meta) => {
279                let out = event_to_string(&config, val)?;
280                Ok(
281                    Value::string(out, head)
282                        .into_pipeline_data_with_metadata(update_metadata(meta)),
283                )
284            }
285            PipelineData::Empty => Ok(PipelineData::Value(
286                Value::string(String::new(), head),
287                update_metadata(None),
288            )),
289            PipelineData::ByteStream(..) => Err(ShellError::TypeMismatch {
290                err_message: "expected record input".into(),
291                span: head,
292            }),
293        }
294    }
295}
296
297fn emit_data_lines(out: &mut String, s: &str) {
298    for line in s.lines() {
299        out.push_str("data: ");
300        out.push_str(line);
301        out.push_str(LINE_ENDING);
302    }
303}
304
305#[allow(clippy::result_large_err)]
306fn value_to_data_string(val: &Value, config: &Config) -> Result<String, ShellError> {
307    match val {
308        Value::String { val, .. } => Ok(val.clone()),
309        _ => {
310            let json_value = value_to_json(val, config).map_err(|err| {
311                ShellError::Generic(GenericError::new(
312                    err.to_string(),
313                    "failed to serialize json",
314                    Span::unknown(),
315                ))
316            })?;
317            serde_json::to_string(&json_value).map_err(|err| {
318                ShellError::Generic(GenericError::new(
319                    err.to_string(),
320                    "failed to serialize json",
321                    Span::unknown(),
322                ))
323            })
324        }
325    }
326}
327
328#[allow(clippy::result_large_err)]
329fn event_to_string(config: &Config, val: Value) -> Result<String, ShellError> {
330    let span = val.span();
331    let rec = match val {
332        Value::Record { val, .. } => val,
333        // Propagate the original error instead of creating a new "expected record" error
334        Value::Error { error, .. } => return Err(*error),
335        other => {
336            return Err(ShellError::TypeMismatch {
337                err_message: format!("expected record, got {}", other.get_type()),
338                span,
339            })
340        }
341    };
342    let mut out = String::new();
343    if let Some(event) = rec.get("event") {
344        if !matches!(event, Value::Nothing { .. }) {
345            out.push_str("event: ");
346            out.push_str(&event.to_expanded_string("", config));
347            out.push_str(LINE_ENDING);
348        }
349    }
350    if let Some(id) = rec.get("id") {
351        if !matches!(id, Value::Nothing { .. }) {
352            out.push_str("id: ");
353            out.push_str(&id.to_expanded_string("", config));
354            out.push_str(LINE_ENDING);
355        }
356    }
357    if let Some(retry) = rec.get("retry") {
358        if !matches!(retry, Value::Nothing { .. }) {
359            out.push_str("retry: ");
360            out.push_str(&retry.to_expanded_string("", config));
361            out.push_str(LINE_ENDING);
362        }
363    }
364    if let Some(data) = rec.get("data") {
365        if !matches!(data, Value::Nothing { .. }) {
366            match data {
367                Value::List { vals, .. } => {
368                    for item in vals {
369                        emit_data_lines(&mut out, &value_to_data_string(item, config)?);
370                    }
371                }
372                _ => {
373                    emit_data_lines(&mut out, &value_to_data_string(data, config)?);
374                }
375            }
376        }
377    }
378    out.push_str(LINE_ENDING);
379    Ok(out)
380}
381
382fn value_to_json(val: &Value, config: &Config) -> serde_json::Result<serde_json::Value> {
383    Ok(match val {
384        Value::Bool { val, .. } => serde_json::Value::Bool(*val),
385        Value::Int { val, .. } => serde_json::Value::from(*val),
386        Value::Float { val, .. } => serde_json::Number::from_f64(*val)
387            .map(serde_json::Value::Number)
388            .unwrap_or(serde_json::Value::Null),
389        Value::String { val, .. } => serde_json::Value::String(val.clone()),
390        Value::List { vals, .. } => serde_json::Value::Array(
391            vals.iter()
392                .map(|v| value_to_json(v, config))
393                .collect::<Result<Vec<_>, _>>()?,
394        ),
395        Value::Record { val, .. } => {
396            let mut map = serde_json::Map::new();
397            for (k, v) in val.iter() {
398                map.insert(k.clone(), value_to_json(v, config)?);
399            }
400            serde_json::Value::Object(map)
401        }
402        Value::Nothing { .. } => serde_json::Value::Null,
403        other => serde_json::Value::String(other.to_expanded_string("", config)),
404    })
405}
406
407fn update_metadata(metadata: Option<PipelineMetadata>) -> Option<PipelineMetadata> {
408    metadata
409        .map(|md| md.with_content_type(Some("text/event-stream".into())))
410        .or_else(|| {
411            Some(PipelineMetadata::default().with_content_type(Some("text/event-stream".into())))
412        })
413}
414
415#[derive(Clone)]
416pub struct ReverseProxyCommand;
417
418impl Default for ReverseProxyCommand {
419    fn default() -> Self {
420        Self::new()
421    }
422}
423
424impl ReverseProxyCommand {
425    pub fn new() -> Self {
426        Self
427    }
428}
429
430impl Command for ReverseProxyCommand {
431    fn name(&self) -> &str {
432        ".reverse-proxy"
433    }
434
435    fn description(&self) -> &str {
436        "Forward HTTP requests to a backend server"
437    }
438
439    fn signature(&self) -> Signature {
440        Signature::build(".reverse-proxy")
441            .required("target_url", SyntaxShape::String, "backend URL to proxy to")
442            .optional(
443                "config",
444                SyntaxShape::Record(vec![]),
445                "optional configuration (headers, preserve_host, strip_prefix, query)",
446            )
447            .input_output_types(vec![(Type::Any, Type::Nothing)])
448            .category(Category::Custom("http".into()))
449    }
450
451    fn run(
452        &self,
453        engine_state: &EngineState,
454        stack: &mut Stack,
455        call: &Call,
456        input: PipelineData,
457    ) -> Result<PipelineData, ShellError> {
458        let target_url: String = call.req(engine_state, stack, 0)?;
459
460        // Convert input pipeline data to bytes for request body
461        let request_body = match input {
462            PipelineData::Empty => Vec::new(),
463            PipelineData::Value(value, _) => crate::response::value_to_bytes(value),
464            PipelineData::ByteStream(stream, _) => {
465                // Collect all bytes from the stream
466                let mut body_bytes = Vec::new();
467                if let Some(mut reader) = stream.reader() {
468                    loop {
469                        let mut buffer = vec![0; 8192];
470                        match reader.read(&mut buffer) {
471                            Ok(0) => break, // EOF
472                            Ok(n) => {
473                                buffer.truncate(n);
474                                body_bytes.extend_from_slice(&buffer);
475                            }
476                            Err(_) => break,
477                        }
478                    }
479                }
480                body_bytes
481            }
482            PipelineData::ListStream(stream, _) => {
483                // Convert list stream to JSON array
484                let items: Vec<_> = stream.into_iter().collect();
485                let json_value = serde_json::Value::Array(
486                    items
487                        .into_iter()
488                        .map(|v| crate::response::value_to_json(&v))
489                        .collect(),
490                );
491                serde_json::to_string(&json_value)
492                    .unwrap_or_default()
493                    .into_bytes()
494            }
495        };
496
497        // Parse optional config
498        let config = call.opt::<Value>(engine_state, stack, 1);
499
500        let mut headers = HashMap::new();
501        let mut preserve_host = true;
502        let mut strip_prefix: Option<String> = None;
503        let mut query: Option<HashMap<String, String>> = None;
504
505        if let Ok(Some(config_value)) = config {
506            if let Ok(record) = config_value.as_record() {
507                // Extract headers
508                if let Some(headers_value) = record.get("headers") {
509                    if let Ok(headers_record) = headers_value.as_record() {
510                        for (k, v) in headers_record.iter() {
511                            let header_value = match v {
512                                Value::String { val, .. } => {
513                                    crate::response::HeaderValue::Single(val.clone())
514                                }
515                                Value::List { vals, .. } => {
516                                    let strings: Vec<String> = vals
517                                        .iter()
518                                        .filter_map(|v| v.as_str().ok())
519                                        .map(|s| s.to_string())
520                                        .collect();
521                                    crate::response::HeaderValue::Multiple(strings)
522                                }
523                                _ => continue, // Skip non-string/non-list values
524                            };
525                            headers.insert(k.clone(), header_value);
526                        }
527                    }
528                }
529
530                // Extract preserve_host
531                if let Some(preserve_host_value) = record.get("preserve_host") {
532                    if let Ok(ph) = preserve_host_value.as_bool() {
533                        preserve_host = ph;
534                    }
535                }
536
537                // Extract strip_prefix
538                if let Some(strip_prefix_value) = record.get("strip_prefix") {
539                    if let Ok(prefix) = strip_prefix_value.as_str() {
540                        strip_prefix = Some(prefix.to_string());
541                    }
542                }
543
544                // Extract query
545                if let Some(query_value) = record.get("query") {
546                    if let Ok(query_record) = query_value.as_record() {
547                        let mut query_map = HashMap::new();
548                        for (k, v) in query_record.iter() {
549                            if let Ok(v_str) = v.as_str() {
550                                query_map.insert(k.clone(), v_str.to_string());
551                            }
552                        }
553                        query = Some(query_map);
554                    }
555                }
556            }
557        }
558
559        let response = Response {
560            status: 200,
561            headers: HashMap::new(),
562            body_type: ResponseBodyType::ReverseProxy {
563                target_url,
564                headers,
565                preserve_host,
566                strip_prefix,
567                request_body,
568                query,
569            },
570        };
571
572        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
573            if let Some(tx) = tx.borrow_mut().take() {
574                tx.send(response).map_err(|_| {
575                    ShellError::Generic(GenericError::new(
576                        "Failed to send response",
577                        "Channel closed",
578                        call.head,
579                    ))
580                })?;
581            }
582            Ok(())
583        })?;
584
585        Ok(PipelineData::Empty)
586    }
587}
588
589#[derive(Clone)]
590pub struct MjCommand {
591    #[cfg(feature = "cross-stream")]
592    store: Option<xs::store::Store>,
593}
594
595impl Default for MjCommand {
596    fn default() -> Self {
597        Self::new()
598    }
599}
600
601impl MjCommand {
602    pub fn new() -> Self {
603        Self {
604            #[cfg(feature = "cross-stream")]
605            store: None,
606        }
607    }
608
609    #[cfg(feature = "cross-stream")]
610    pub fn with_store(store: xs::store::Store) -> Self {
611        Self { store: Some(store) }
612    }
613}
614
615impl Command for MjCommand {
616    fn name(&self) -> &str {
617        ".mj"
618    }
619
620    fn description(&self) -> &str {
621        "Render a minijinja template with context from input"
622    }
623
624    fn signature(&self) -> Signature {
625        Signature::build(".mj")
626            .optional("file", SyntaxShape::String, "template file path")
627            .named(
628                "inline",
629                SyntaxShape::String,
630                "inline template string",
631                Some('i'),
632            )
633            .named(
634                "topic",
635                SyntaxShape::String,
636                "load template from a store topic",
637                Some('t'),
638            )
639            .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
640            .category(Category::Custom("http".into()))
641    }
642
643    fn run(
644        &self,
645        engine_state: &EngineState,
646        stack: &mut Stack,
647        call: &Call,
648        input: PipelineData,
649    ) -> Result<PipelineData, ShellError> {
650        let head = call.head;
651        let file: Option<String> = call.opt(engine_state, stack, 0)?;
652        let inline: Option<String> = call.get_flag(engine_state, stack, "inline")?;
653        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
654
655        let mode_count = file.is_some() as u8 + inline.is_some() as u8 + topic.is_some() as u8;
656        if mode_count > 1 {
657            return Err(ShellError::Generic(GenericError::new(
658                "Cannot combine file, --inline, and --topic",
659                "use exactly one of: file path, --inline, or --topic",
660                head,
661            )));
662        }
663        if mode_count == 0 {
664            return Err(ShellError::Generic(GenericError::new(
665                "No template specified",
666                "provide a file path, --inline, or --topic",
667                head,
668            )));
669        }
670
671        // Get context from input
672        let context = match input {
673            PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
674            PipelineData::Empty => minijinja::Value::from(()),
675            _ => {
676                return Err(ShellError::TypeMismatch {
677                    err_message: "expected record input".into(),
678                    span: head,
679                });
680            }
681        };
682
683        // Set up environment and get template
684        let mut env = Environment::new();
685        env.set_auto_escape_callback(|_| AutoEscape::Html);
686        let tmpl = if let Some(ref path) = file {
687            // File mode: resolve from filesystem only
688            let path = std::path::Path::new(path);
689            let abs_path = if path.is_absolute() {
690                path.to_path_buf()
691            } else {
692                std::env::current_dir().unwrap_or_default().join(path)
693            };
694            if let Some(parent) = abs_path.parent() {
695                env.set_loader(path_loader(parent));
696            }
697            let name = abs_path
698                .file_name()
699                .and_then(|n| n.to_str())
700                .unwrap_or("template");
701            env.get_template(name).map_err(|e| {
702                ShellError::Generic(GenericError::new(
703                    format!("Template error: {e}"),
704                    e.to_string(),
705                    head,
706                ))
707            })?
708        } else if let Some(ref topic_name) = topic {
709            // Topic mode: resolve from store only
710            #[cfg(feature = "cross-stream")]
711            {
712                let store = self.store.as_ref().ok_or_else(|| {
713                    ShellError::Generic(GenericError::new(
714                        "--topic requires --store",
715                        "server must be started with --store to use --topic",
716                        head,
717                    ))
718                })?;
719                let source = load_topic_content(store, topic_name).ok_or_else(|| {
720                    ShellError::Generic(GenericError::new(
721                        format!("Topic not found: {topic_name}"),
722                        "no content in store for this topic",
723                        head,
724                    ))
725                })?;
726                let topic_store = store.clone();
727                env.set_loader(move |name: &str| Ok(load_topic_content(&topic_store, name)));
728                env.add_template_owned(topic_name.clone(), source)
729                    .map_err(|e| {
730                        ShellError::Generic(GenericError::new(
731                            format!("Template parse error: {e}"),
732                            e.to_string(),
733                            head,
734                        ))
735                    })?;
736                env.get_template(topic_name).map_err(|e| {
737                    ShellError::Generic(GenericError::new(
738                        format!("Template error: {e}"),
739                        e.to_string(),
740                        head,
741                    ))
742                })?
743            }
744            #[cfg(not(feature = "cross-stream"))]
745            {
746                let _ = topic_name;
747                return Err(ShellError::Generic(GenericError::new(
748                    "--topic requires cross-stream feature",
749                    "built without store support",
750                    head,
751                )));
752            }
753        } else {
754            // Inline mode: self-contained, no loader
755            let source = inline.unwrap();
756            env.add_template_owned("template".to_string(), source)
757                .map_err(|e| {
758                    ShellError::Generic(GenericError::new(
759                        format!("Template parse error: {e}"),
760                        e.to_string(),
761                        head,
762                    ))
763                })?;
764            env.get_template("template").map_err(|e| {
765                ShellError::Generic(GenericError::new(
766                    format!("Failed to get template: {e}"),
767                    e.to_string(),
768                    head,
769                ))
770            })?
771        };
772
773        let rendered = tmpl.render(&context).map_err(|e| {
774            ShellError::Generic(GenericError::new(
775                format!("Template render error: {e}"),
776                e.to_string(),
777                head,
778            ))
779        })?;
780
781        Ok(Value::string(rendered, head).into_pipeline_data())
782    }
783}
784
785/// Convert a nu_protocol::Value to a minijinja::Value via serde_json
786fn nu_value_to_minijinja(val: &Value) -> minijinja::Value {
787    let json = value_to_json(val, &Config::default()).unwrap_or(serde_json::Value::Null);
788    minijinja::Value::from_serialize(&json)
789}
790
791// === .mj compile ===
792
793#[derive(Clone)]
794pub struct MjCompileCommand {
795    #[cfg(feature = "cross-stream")]
796    store: Option<xs::store::Store>,
797}
798
799impl Default for MjCompileCommand {
800    fn default() -> Self {
801        Self::new()
802    }
803}
804
805impl MjCompileCommand {
806    pub fn new() -> Self {
807        Self {
808            #[cfg(feature = "cross-stream")]
809            store: None,
810        }
811    }
812
813    #[cfg(feature = "cross-stream")]
814    pub fn with_store(store: xs::store::Store) -> Self {
815        Self { store: Some(store) }
816    }
817}
818
819impl Command for MjCompileCommand {
820    fn name(&self) -> &str {
821        ".mj compile"
822    }
823
824    fn description(&self) -> &str {
825        "Compile a minijinja template, returning a reusable compiled template"
826    }
827
828    fn signature(&self) -> Signature {
829        Signature::build(".mj compile")
830            .optional("file", SyntaxShape::String, "template file path")
831            .named(
832                "inline",
833                SyntaxShape::Any,
834                "inline template (string or {__html: string})",
835                Some('i'),
836            )
837            .named(
838                "topic",
839                SyntaxShape::String,
840                "load template from a store topic",
841                Some('t'),
842            )
843            .input_output_types(vec![(
844                Type::Nothing,
845                Type::Custom("CompiledTemplate".into()),
846            )])
847            .category(Category::Custom("http".into()))
848    }
849
850    fn run(
851        &self,
852        engine_state: &EngineState,
853        stack: &mut Stack,
854        call: &Call,
855        _input: PipelineData,
856    ) -> Result<PipelineData, ShellError> {
857        let head = call.head;
858        let file: Option<String> = call.opt(engine_state, stack, 0)?;
859        let inline: Option<Value> = call.get_flag(engine_state, stack, "inline")?;
860        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
861
862        // Extract template string from --inline value (string or {__html: string})
863        let inline_str: Option<String> = match &inline {
864            None => None,
865            Some(val) => match val {
866                Value::String { val, .. } => Some(val.clone()),
867                Value::Record { val, .. } => {
868                    if let Some(html_val) = val.get("__html") {
869                        match html_val {
870                            Value::String { val, .. } => Some(val.clone()),
871                            _ => {
872                                return Err(ShellError::Generic(GenericError::new(
873                                    "__html must be a string",
874                                    "expected string value",
875                                    head,
876                                )));
877                            }
878                        }
879                    } else {
880                        return Err(ShellError::Generic(GenericError::new(
881                            "Record must have __html field",
882                            "expected {__html: string}",
883                            head,
884                        )));
885                    }
886                }
887                _ => {
888                    return Err(ShellError::Generic(GenericError::new(
889                        "--inline must be string or {__html: string}",
890                        "invalid type",
891                        head,
892                    )));
893                }
894            },
895        };
896
897        let mode_count = file.is_some() as u8 + inline_str.is_some() as u8 + topic.is_some() as u8;
898        if mode_count > 1 {
899            return Err(ShellError::Generic(GenericError::new(
900                "Cannot combine file, --inline, and --topic",
901                "use exactly one of: file path, --inline, or --topic",
902                head,
903            )));
904        }
905        if mode_count == 0 {
906            return Err(ShellError::Generic(GenericError::new(
907                "No template specified",
908                "provide a file path, --inline, or --topic",
909                head,
910            )));
911        }
912
913        let hash = if let Some(ref path) = file {
914            // File mode: filesystem only
915            let path = std::path::Path::new(path);
916            let abs_path = if path.is_absolute() {
917                path.to_path_buf()
918            } else {
919                std::env::current_dir().unwrap_or_default().join(path)
920            };
921            let base_dir = abs_path.parent().unwrap_or(&abs_path).to_path_buf();
922            let source = std::fs::read_to_string(&abs_path).map_err(|e| {
923                ShellError::Generic(GenericError::new(
924                    format!("Failed to read template file: {e}"),
925                    "could not read file",
926                    head,
927                ))
928            })?;
929            compile_template(&source, &base_dir)
930        } else if let Some(ref topic_name) = topic {
931            // Topic mode: store only
932            #[cfg(feature = "cross-stream")]
933            {
934                let store = self.store.as_ref().ok_or_else(|| {
935                    ShellError::Generic(GenericError::new(
936                        "--topic requires --store",
937                        "server must be started with --store to use --topic",
938                        head,
939                    ))
940                })?;
941                let source = load_topic_content(store, topic_name).ok_or_else(|| {
942                    ShellError::Generic(GenericError::new(
943                        format!("Topic not found: {topic_name}"),
944                        "no content in store for this topic",
945                        head,
946                    ))
947                })?;
948                let topic_store = store.clone();
949                // Use a synthetic base_dir for cache key uniqueness
950                let base_dir = std::path::PathBuf::from(format!("__topic__/{topic_name}"));
951                compile_template_with_loader(&source, &base_dir, move |name: &str| {
952                    Ok(load_topic_content(&topic_store, name))
953                })
954            }
955            #[cfg(not(feature = "cross-stream"))]
956            {
957                let _ = topic_name;
958                return Err(ShellError::Generic(GenericError::new(
959                    "--topic requires cross-stream feature",
960                    "built without store support",
961                    head,
962                )));
963            }
964        } else {
965            // Inline mode: self-contained, no loader
966            let source = inline_str.unwrap();
967            let base_dir = std::path::PathBuf::from("__inline__");
968            compile_template_with_loader(&source, &base_dir, |_| Ok(None))
969        };
970
971        let hash = hash.map_err(|e| {
972            ShellError::Generic(GenericError::new(
973                format!("Template compile error: {e}"),
974                e.to_string(),
975                head,
976            ))
977        })?;
978
979        Ok(Value::custom(Box::new(CompiledTemplate { hash }), head).into_pipeline_data())
980    }
981}
982
983// === .mj render ===
984
985#[derive(Clone)]
986pub struct MjRenderCommand;
987
988impl Default for MjRenderCommand {
989    fn default() -> Self {
990        Self::new()
991    }
992}
993
994impl MjRenderCommand {
995    pub fn new() -> Self {
996        Self
997    }
998}
999
1000impl Command for MjRenderCommand {
1001    fn name(&self) -> &str {
1002        ".mj render"
1003    }
1004
1005    fn description(&self) -> &str {
1006        "Render a compiled minijinja template with context from input"
1007    }
1008
1009    fn signature(&self) -> Signature {
1010        Signature::build(".mj render")
1011            .required(
1012                "template",
1013                SyntaxShape::Any,
1014                "compiled template from '.mj compile'",
1015            )
1016            .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
1017            .category(Category::Custom("http".into()))
1018    }
1019
1020    fn run(
1021        &self,
1022        engine_state: &EngineState,
1023        stack: &mut Stack,
1024        call: &Call,
1025        input: PipelineData,
1026    ) -> Result<PipelineData, ShellError> {
1027        let head = call.head;
1028        let template_val: Value = call.req(engine_state, stack, 0)?;
1029
1030        // Extract CompiledTemplate from the value
1031        let compiled = match template_val {
1032            Value::Custom { val, .. } => val
1033                .as_any()
1034                .downcast_ref::<CompiledTemplate>()
1035                .ok_or_else(|| ShellError::TypeMismatch {
1036                    err_message: "expected CompiledTemplate".into(),
1037                    span: head,
1038                })?
1039                .clone(),
1040            _ => {
1041                return Err(ShellError::TypeMismatch {
1042                    err_message: "expected CompiledTemplate from '.mj compile'".into(),
1043                    span: head,
1044                });
1045            }
1046        };
1047
1048        // Get context from input
1049        let context = match input {
1050            PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
1051            PipelineData::Empty => minijinja::Value::from(()),
1052            _ => {
1053                return Err(ShellError::TypeMismatch {
1054                    err_message: "expected record input".into(),
1055                    span: head,
1056                });
1057            }
1058        };
1059
1060        // Render template
1061        let rendered = compiled.render(&context).map_err(|e| {
1062            ShellError::Generic(GenericError::new(
1063                format!("Template render error: {e}"),
1064                e.to_string(),
1065                head,
1066            ))
1067        })?;
1068
1069        Ok(Value::string(rendered, head).into_pipeline_data())
1070    }
1071}
1072
1073// === Syntax Highlighting ===
1074
1075struct SyntaxHighlighter {
1076    syntax_set: SyntaxSet,
1077}
1078
1079impl SyntaxHighlighter {
1080    fn new() -> Self {
1081        const SYNTAX_SET: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/syntax_set.bin"));
1082        let syntax_set = syntect::dumps::from_binary(SYNTAX_SET);
1083        Self { syntax_set }
1084    }
1085
1086    fn highlight(&self, code: &str, lang: Option<&str>) -> String {
1087        let syntax = match lang {
1088            Some(lang) => self
1089                .syntax_set
1090                .find_syntax_by_token(lang)
1091                .or_else(|| self.syntax_set.find_syntax_by_extension(lang)),
1092            None => None,
1093        }
1094        .unwrap_or_else(|| self.syntax_set.find_syntax_plain_text());
1095
1096        let mut html_generator = ClassedHTMLGenerator::new_with_class_style(
1097            syntax,
1098            &self.syntax_set,
1099            ClassStyle::Spaced,
1100        );
1101
1102        for line in LinesWithEndings::from(code) {
1103            let _ = html_generator.parse_html_for_line_which_includes_newline(line);
1104        }
1105
1106        html_generator.finalize()
1107    }
1108
1109    fn list_syntaxes(&self) -> Vec<(String, Vec<String>)> {
1110        self.syntax_set
1111            .syntaxes()
1112            .iter()
1113            .map(|s| (s.name.clone(), s.file_extensions.clone()))
1114            .collect()
1115    }
1116}
1117
1118static HIGHLIGHTER: OnceLock<SyntaxHighlighter> = OnceLock::new();
1119
1120fn get_highlighter() -> &'static SyntaxHighlighter {
1121    HIGHLIGHTER.get_or_init(SyntaxHighlighter::new)
1122}
1123
1124// === .highlight command ===
1125
1126#[derive(Clone)]
1127pub struct HighlightCommand;
1128
1129impl Default for HighlightCommand {
1130    fn default() -> Self {
1131        Self::new()
1132    }
1133}
1134
1135impl HighlightCommand {
1136    pub fn new() -> Self {
1137        Self
1138    }
1139}
1140
1141impl Command for HighlightCommand {
1142    fn name(&self) -> &str {
1143        ".highlight"
1144    }
1145
1146    fn description(&self) -> &str {
1147        "Syntax highlight code, outputting HTML with CSS classes"
1148    }
1149
1150    fn signature(&self) -> Signature {
1151        Signature::build(".highlight")
1152            .required("lang", SyntaxShape::String, "language for highlighting")
1153            .input_output_types(vec![(Type::String, Type::record())])
1154            .category(Category::Custom("http".into()))
1155    }
1156
1157    fn run(
1158        &self,
1159        engine_state: &EngineState,
1160        stack: &mut Stack,
1161        call: &Call,
1162        input: PipelineData,
1163    ) -> Result<PipelineData, ShellError> {
1164        let head = call.head;
1165        let lang: String = call.req(engine_state, stack, 0)?;
1166
1167        let code = match input {
1168            PipelineData::Value(Value::String { val, .. }, _) => val,
1169            PipelineData::ByteStream(stream, _) => stream.into_string()?,
1170            _ => {
1171                return Err(ShellError::TypeMismatch {
1172                    err_message: "expected string input".into(),
1173                    span: head,
1174                });
1175            }
1176        };
1177
1178        let highlighter = get_highlighter();
1179        let html = highlighter.highlight(&code, Some(&lang));
1180
1181        Ok(Value::record(
1182            nu_protocol::record! {
1183                "__html" => Value::string(html, head),
1184            },
1185            head,
1186        )
1187        .into_pipeline_data())
1188    }
1189}
1190
1191// === .highlight theme command ===
1192
1193#[derive(Clone)]
1194pub struct HighlightThemeCommand;
1195
1196impl Default for HighlightThemeCommand {
1197    fn default() -> Self {
1198        Self::new()
1199    }
1200}
1201
1202impl HighlightThemeCommand {
1203    pub fn new() -> Self {
1204        Self
1205    }
1206}
1207
1208impl Command for HighlightThemeCommand {
1209    fn name(&self) -> &str {
1210        ".highlight theme"
1211    }
1212
1213    fn description(&self) -> &str {
1214        "List available themes or get CSS for a specific theme"
1215    }
1216
1217    fn signature(&self) -> Signature {
1218        Signature::build(".highlight theme")
1219            .optional("name", SyntaxShape::String, "theme name (omit to list all)")
1220            .input_output_types(vec![
1221                (Type::Nothing, Type::List(Box::new(Type::String))),
1222                (Type::Nothing, Type::String),
1223            ])
1224            .category(Category::Custom("http".into()))
1225    }
1226
1227    fn run(
1228        &self,
1229        engine_state: &EngineState,
1230        stack: &mut Stack,
1231        call: &Call,
1232        _input: PipelineData,
1233    ) -> Result<PipelineData, ShellError> {
1234        let head = call.head;
1235        let name: Option<String> = call.opt(engine_state, stack, 0)?;
1236
1237        let assets = syntect_assets::assets::HighlightingAssets::from_binary();
1238
1239        match name {
1240            None => {
1241                let themes: Vec<Value> = assets.themes().map(|t| Value::string(t, head)).collect();
1242                Ok(Value::list(themes, head).into_pipeline_data())
1243            }
1244            Some(theme_name) => {
1245                let theme = assets.get_theme(&theme_name);
1246                let css = syntect::html::css_for_theme_with_class_style(theme, ClassStyle::Spaced)
1247                    .map_err(|e| {
1248                        ShellError::Generic(GenericError::new(
1249                            format!("Failed to generate CSS: {e}"),
1250                            e.to_string(),
1251                            head,
1252                        ))
1253                    })?;
1254                Ok(Value::string(css, head).into_pipeline_data())
1255            }
1256        }
1257    }
1258}
1259
1260// === .highlight lang command ===
1261
1262#[derive(Clone)]
1263pub struct HighlightLangCommand;
1264
1265impl Default for HighlightLangCommand {
1266    fn default() -> Self {
1267        Self::new()
1268    }
1269}
1270
1271impl HighlightLangCommand {
1272    pub fn new() -> Self {
1273        Self
1274    }
1275}
1276
1277impl Command for HighlightLangCommand {
1278    fn name(&self) -> &str {
1279        ".highlight lang"
1280    }
1281
1282    fn description(&self) -> &str {
1283        "List available languages for syntax highlighting"
1284    }
1285
1286    fn signature(&self) -> Signature {
1287        Signature::build(".highlight lang")
1288            .input_output_types(vec![(Type::Nothing, Type::List(Box::new(Type::record())))])
1289            .category(Category::Custom("http".into()))
1290    }
1291
1292    fn run(
1293        &self,
1294        _engine_state: &EngineState,
1295        _stack: &mut Stack,
1296        call: &Call,
1297        _input: PipelineData,
1298    ) -> Result<PipelineData, ShellError> {
1299        let head = call.head;
1300        let highlighter = get_highlighter();
1301        let langs: Vec<Value> = highlighter
1302            .list_syntaxes()
1303            .into_iter()
1304            .map(|(name, exts)| {
1305                Value::record(
1306                    nu_protocol::record! {
1307                        "name" => Value::string(name, head),
1308                        "extensions" => Value::list(
1309                            exts.into_iter().map(|e| Value::string(e, head)).collect(),
1310                            head
1311                        ),
1312                    },
1313                    head,
1314                )
1315            })
1316            .collect();
1317        Ok(Value::list(langs, head).into_pipeline_data())
1318    }
1319}
1320
1321// === .md command ===
1322
1323use pulldown_cmark::{html, CodeBlockKind, Event, Parser as MarkdownParser, Tag, TagEnd};
1324
1325#[derive(Clone)]
1326pub struct MdCommand;
1327
1328impl Default for MdCommand {
1329    fn default() -> Self {
1330        Self::new()
1331    }
1332}
1333
1334impl MdCommand {
1335    pub fn new() -> Self {
1336        Self
1337    }
1338}
1339
1340impl Command for MdCommand {
1341    fn name(&self) -> &str {
1342        ".md"
1343    }
1344
1345    fn description(&self) -> &str {
1346        "Convert Markdown to HTML with syntax-highlighted code blocks"
1347    }
1348
1349    fn signature(&self) -> Signature {
1350        Signature::build(".md")
1351            .input_output_types(vec![
1352                (Type::String, Type::record()),
1353                (Type::record(), Type::record()),
1354            ])
1355            .category(Category::Custom("http".into()))
1356    }
1357
1358    fn run(
1359        &self,
1360        _engine_state: &EngineState,
1361        _stack: &mut Stack,
1362        call: &Call,
1363        input: PipelineData,
1364    ) -> Result<PipelineData, ShellError> {
1365        let head = call.head;
1366
1367        // Determine if input is trusted ({__html: ...}) or untrusted (plain string)
1368        let (markdown, trusted) = match input.into_value(head)? {
1369            Value::String { val, .. } => (val, false),
1370            Value::Record { val, .. } => {
1371                if let Some(html_val) = val.get("__html") {
1372                    (html_val.as_str()?.to_string(), true)
1373                } else {
1374                    return Err(ShellError::TypeMismatch {
1375                        err_message: "expected string or {__html: ...}".into(),
1376                        span: head,
1377                    });
1378                }
1379            }
1380            other => {
1381                return Err(ShellError::TypeMismatch {
1382                    err_message: format!(
1383                        "expected string or {{__html: ...}}, got {}",
1384                        other.get_type()
1385                    ),
1386                    span: head,
1387                });
1388            }
1389        };
1390
1391        let highlighter = get_highlighter();
1392
1393        let mut in_code_block = false;
1394        let mut current_code = String::new();
1395        let mut current_lang: Option<String> = None;
1396
1397        let mut options = pulldown_cmark::Options::empty();
1398        options.insert(pulldown_cmark::Options::ENABLE_TABLES);
1399        options.insert(pulldown_cmark::Options::ENABLE_STRIKETHROUGH);
1400        options.insert(pulldown_cmark::Options::ENABLE_TASKLISTS);
1401        options.insert(pulldown_cmark::Options::ENABLE_FOOTNOTES);
1402        options.insert(pulldown_cmark::Options::ENABLE_HEADING_ATTRIBUTES);
1403        options.insert(pulldown_cmark::Options::ENABLE_GFM);
1404        options.insert(pulldown_cmark::Options::ENABLE_DEFINITION_LIST);
1405        let parser = MarkdownParser::new_ext(&markdown, options).map(|event| match event {
1406            Event::Start(Tag::CodeBlock(kind)) => {
1407                in_code_block = true;
1408                current_code.clear();
1409                current_lang = match kind {
1410                    CodeBlockKind::Fenced(info) => {
1411                        let lang = info.split_whitespace().next().unwrap_or("");
1412                        if lang.is_empty() {
1413                            None
1414                        } else {
1415                            Some(lang.to_string())
1416                        }
1417                    }
1418                    CodeBlockKind::Indented => None,
1419                };
1420                Event::Text("".into())
1421            }
1422            Event::End(TagEnd::CodeBlock) => {
1423                in_code_block = false;
1424                let highlighted = highlighter.highlight(&current_code, current_lang.as_deref());
1425                let mut html_out = String::new();
1426                html_out.push_str("<pre><code");
1427                if let Some(lang) = &current_lang {
1428                    let lang = v_htmlescape::escape(lang);
1429                    html_out.push_str(&format!(" class=\"language-{lang}\""));
1430                }
1431                html_out.push('>');
1432                html_out.push_str(&highlighted);
1433                html_out.push_str("</code></pre>");
1434                Event::Html(html_out.into())
1435            }
1436            Event::Text(text) => {
1437                if in_code_block {
1438                    current_code.push_str(&text);
1439                    Event::Text("".into())
1440                } else {
1441                    Event::Text(text)
1442                }
1443            }
1444            // Escape raw HTML if input is untrusted
1445            Event::Html(html) => {
1446                if trusted {
1447                    Event::Html(html)
1448                } else {
1449                    Event::Text(html) // push_html escapes Text
1450                }
1451            }
1452            Event::InlineHtml(html) => {
1453                if trusted {
1454                    Event::InlineHtml(html)
1455                } else {
1456                    Event::Text(html)
1457                }
1458            }
1459            e => e,
1460        });
1461
1462        let mut html_output = String::new();
1463        html::push_html(&mut html_output, parser);
1464
1465        Ok(Value::record(
1466            nu_protocol::record! {
1467                "__html" => Value::string(html_output, head),
1468            },
1469            head,
1470        )
1471        .into_pipeline_data())
1472    }
1473}
1474
1475// === .print command ===
1476
1477#[derive(Clone)]
1478pub struct PrintCommand;
1479
1480impl Default for PrintCommand {
1481    fn default() -> Self {
1482        Self::new()
1483    }
1484}
1485
1486impl PrintCommand {
1487    pub fn new() -> Self {
1488        Self
1489    }
1490}
1491
1492impl Command for PrintCommand {
1493    fn name(&self) -> &str {
1494        "print"
1495    }
1496
1497    fn description(&self) -> &str {
1498        "Print the given values to the http-nu logging system."
1499    }
1500
1501    fn extra_description(&self) -> &str {
1502        r#"This command outputs to http-nu's logging system rather than stdout/stderr.
1503Messages appear in both human-readable and JSONL output modes.
1504
1505`print` may be used inside blocks of code (e.g.: hooks) to display text during execution without interfering with the pipeline."#
1506    }
1507
1508    fn search_terms(&self) -> Vec<&str> {
1509        vec!["display"]
1510    }
1511
1512    fn signature(&self) -> Signature {
1513        Signature::build("print")
1514            .input_output_types(vec![
1515                (Type::Nothing, Type::Nothing),
1516                (Type::Any, Type::Nothing),
1517            ])
1518            .allow_variants_without_examples(true)
1519            .rest("rest", SyntaxShape::Any, "the values to print")
1520            .switch(
1521                "no-newline",
1522                "print without inserting a newline for the line ending",
1523                Some('n'),
1524            )
1525            .switch("stderr", "print to stderr instead of stdout", Some('e'))
1526            .switch(
1527                "raw",
1528                "print without formatting (including binary data)",
1529                Some('r'),
1530            )
1531            .category(Category::Strings)
1532    }
1533
1534    fn run(
1535        &self,
1536        engine_state: &EngineState,
1537        stack: &mut Stack,
1538        call: &Call,
1539        input: PipelineData,
1540    ) -> Result<PipelineData, ShellError> {
1541        let args: Vec<Value> = call.rest(engine_state, stack, 0)?;
1542        let no_newline = call.has_flag(engine_state, stack, "no-newline")?;
1543        let config = stack.get_config(engine_state);
1544
1545        let format_value = |val: &Value| -> String { val.to_expanded_string(" ", &config) };
1546
1547        if !args.is_empty() {
1548            let messages: Vec<String> = args.iter().map(format_value).collect();
1549            let message = if no_newline {
1550                messages.join("")
1551            } else {
1552                messages.join("\n")
1553            };
1554            log_print(&message);
1555        } else if !input.is_nothing() {
1556            let message = match input {
1557                PipelineData::Value(val, _) => format_value(&val),
1558                PipelineData::ListStream(stream, _) => {
1559                    let vals: Vec<String> = stream.into_iter().map(|v| format_value(&v)).collect();
1560                    vals.join("\n")
1561                }
1562                PipelineData::ByteStream(stream, _) => stream.into_string()?,
1563                PipelineData::Empty => String::new(),
1564            };
1565            if !message.is_empty() {
1566                log_print(&message);
1567            }
1568        }
1569
1570        Ok(PipelineData::empty())
1571    }
1572
1573    fn examples(&self) -> Vec<Example<'_>> {
1574        vec![
1575            Example {
1576                description: "Print 'hello world'",
1577                example: r#"print "hello world""#,
1578                result: None,
1579            },
1580            Example {
1581                description: "Print the sum of 2 and 3",
1582                example: r#"print (2 + 3)"#,
1583                result: None,
1584            },
1585        ]
1586    }
1587}
1588
1589// === .run: parse, compile, and evaluate a nushell pipeline string in a sandbox ===
1590
1591#[derive(Clone)]
1592pub struct RunNuCommand;
1593
1594impl Default for RunNuCommand {
1595    fn default() -> Self {
1596        Self::new()
1597    }
1598}
1599
1600impl RunNuCommand {
1601    pub fn new() -> Self {
1602        Self
1603    }
1604}
1605
1606impl Command for RunNuCommand {
1607    fn name(&self) -> &str {
1608        ".run"
1609    }
1610
1611    fn description(&self) -> &str {
1612        "Parse, compile, and evaluate a nushell pipeline string in a sandboxed engine state."
1613    }
1614
1615    fn extra_description(&self) -> &str {
1616        r#"The submitted script is parsed and compiled against a clone of the current engine state.
1617Any defs, lets, or modules introduced by the script live only for the duration of the call --
1618they do not leak back into the calling engine. Pipeline input is forwarded to the script as `$in`;
1619the caller's local bindings and environment are not visible inside the sandbox."#
1620    }
1621
1622    fn signature(&self) -> Signature {
1623        Signature::build(".run")
1624            .input_output_types(vec![(Type::Any, Type::Any)])
1625            .required("script", SyntaxShape::String, "nushell source to evaluate")
1626            .category(Category::Experimental)
1627    }
1628
1629    fn run(
1630        &self,
1631        engine_state: &EngineState,
1632        stack: &mut Stack,
1633        call: &Call,
1634        input: PipelineData,
1635    ) -> Result<PipelineData, ShellError> {
1636        use nu_engine::eval_block_with_early_return;
1637        use nu_parser::parse;
1638        use nu_protocol::{debugger::WithoutDebug, engine::StateWorkingSet, format_cli_error};
1639
1640        let script: String = call.req(engine_state, stack, 0)?;
1641
1642        let mut ws = StateWorkingSet::new(engine_state);
1643        let block = parse(&mut ws, Some("<.run>"), script.as_bytes(), false);
1644
1645        if !ws.parse_errors.is_empty() {
1646            let mut combined = String::new();
1647            for err in &ws.parse_errors {
1648                let se = ShellError::Generic(GenericError::new(
1649                    "Parse error",
1650                    format!("{err}"),
1651                    err.span(),
1652                ));
1653                combined.push_str(&format_cli_error(None, &ws, &se, None));
1654                combined.push('\n');
1655            }
1656            let plain = nu_utils::strip_ansi_string_likely(combined);
1657            return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1658        }
1659        if !ws.compile_errors.is_empty() {
1660            let mut combined = String::new();
1661            for err in &ws.compile_errors {
1662                let se = ShellError::Generic(GenericError::new_internal(
1663                    format!("Compile error {err}"),
1664                    "",
1665                ));
1666                combined.push_str(&format_cli_error(None, &ws, &se, None));
1667                combined.push('\n');
1668            }
1669            let plain = nu_utils::strip_ansi_string_likely(combined);
1670            return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1671        }
1672
1673        let mut sandbox = engine_state.clone();
1674        sandbox.merge_delta(ws.render())?;
1675
1676        let mut sub_stack = Stack::new();
1677        eval_block_with_early_return::<WithoutDebug>(&sandbox, &mut sub_stack, &block, input)
1678            .map(|exec| exec.body)
1679    }
1680}
1681
1682// === .bus pub / .bus sub: ephemeral local pub/sub ===
1683
1684#[derive(Clone)]
1685pub struct BusPubCommand {
1686    bus: Arc<Bus>,
1687}
1688
1689impl BusPubCommand {
1690    pub fn new(bus: Arc<Bus>) -> Self {
1691        Self { bus }
1692    }
1693}
1694
1695impl Command for BusPubCommand {
1696    fn name(&self) -> &str {
1697        ".bus pub"
1698    }
1699
1700    fn description(&self) -> &str {
1701        "Publish a value to an ephemeral in-process topic."
1702    }
1703
1704    fn signature(&self) -> Signature {
1705        Signature::build(".bus pub")
1706            .input_output_types(vec![(Type::Any, Type::Nothing)])
1707            .required("topic", SyntaxShape::String, "topic to publish to")
1708            .category(Category::Experimental)
1709    }
1710
1711    fn run(
1712        &self,
1713        engine_state: &EngineState,
1714        stack: &mut Stack,
1715        call: &Call,
1716        input: PipelineData,
1717    ) -> Result<PipelineData, ShellError> {
1718        let topic: String = call.req(engine_state, stack, 0)?;
1719        let value = input.into_value(call.head)?;
1720        self.bus.publish(topic, value);
1721        Ok(PipelineData::empty())
1722    }
1723}
1724
1725#[derive(Clone)]
1726pub struct BusSubCommand {
1727    bus: Arc<Bus>,
1728}
1729
1730impl BusSubCommand {
1731    pub fn new(bus: Arc<Bus>) -> Self {
1732        Self { bus }
1733    }
1734}
1735
1736impl Command for BusSubCommand {
1737    fn name(&self) -> &str {
1738        ".bus sub"
1739    }
1740
1741    fn description(&self) -> &str {
1742        "Subscribe to ephemeral in-process topics; yields {topic, value} records."
1743    }
1744
1745    fn extra_description(&self) -> &str {
1746        r#"With no argument, yields every published event. With a glob pattern (e.g. "tab-abc.*"),
1747yields only events whose topic matches. `*` matches any run of characters including dots."#
1748    }
1749
1750    fn signature(&self) -> Signature {
1751        Signature::build(".bus sub")
1752            .input_output_types(vec![(Type::Nothing, Type::Any)])
1753            .optional(
1754                "pattern",
1755                SyntaxShape::String,
1756                "glob pattern; omit for all topics",
1757            )
1758            .category(Category::Experimental)
1759    }
1760
1761    fn run(
1762        &self,
1763        engine_state: &EngineState,
1764        stack: &mut Stack,
1765        call: &Call,
1766        _input: PipelineData,
1767    ) -> Result<PipelineData, ShellError> {
1768        use nu_protocol::{record, ListStream, Signals};
1769
1770        let pattern: Option<String> = call.opt(engine_state, stack, 0)?;
1771        let span = call.head;
1772        let signals = engine_state.signals().clone();
1773
1774        let mut sub = self.bus.subscribe(pattern);
1775
1776        let (tx, rx) = std::sync::mpsc::channel::<crate::bus::BusEvent>();
1777        std::thread::spawn(move || {
1778            let rt = match tokio::runtime::Runtime::new() {
1779                Ok(rt) => rt,
1780                Err(_) => return,
1781            };
1782            rt.block_on(async move {
1783                while let Some(ev) = sub.recv().await {
1784                    if tx.send(ev).is_err() {
1785                        break;
1786                    }
1787                }
1788            });
1789        });
1790
1791        let stream = ListStream::new(
1792            std::iter::from_fn(move || {
1793                use std::sync::mpsc::RecvTimeoutError;
1794                use std::time::Duration;
1795                loop {
1796                    if signals.interrupted() {
1797                        return None;
1798                    }
1799                    match rx.recv_timeout(Duration::from_millis(100)) {
1800                        Ok(ev) => {
1801                            let rec = record! {
1802                                "topic" => Value::string(ev.topic, span),
1803                                "value" => ev.value,
1804                            };
1805                            return Some(Value::record(rec, span));
1806                        }
1807                        Err(RecvTimeoutError::Timeout) => continue,
1808                        Err(RecvTimeoutError::Disconnected) => return None,
1809                    }
1810                }
1811            }),
1812            span,
1813            Signals::empty(),
1814        );
1815
1816        Ok(PipelineData::ListStream(stream, None))
1817    }
1818}