Skip to main content

http_nu/
commands.rs

1use crate::bus::Bus;
2use crate::logging::log_print;
3use crate::response::{Response, ResponseBodyType};
4use nu_engine::command_prelude::*;
5use nu_protocol::{
6    shell_error::generic::GenericError, ByteStream, ByteStreamType, Category, Config, CustomValue,
7    PipelineData, PipelineMetadata, ShellError, Signature, Span, SyntaxShape, Type, Value,
8};
9use serde::{Deserialize, Serialize};
10use std::cell::RefCell;
11use std::collections::HashMap;
12use std::io::Read;
13use std::path::PathBuf;
14use tokio::sync::oneshot;
15
16use minijinja::{path_loader, AutoEscape, Environment};
17use std::sync::{Arc, OnceLock, RwLock};
18
19use syntect::html::{ClassStyle, ClassedHTMLGenerator};
20use syntect::parsing::SyntaxSet;
21use syntect::util::LinesWithEndings;
22
23// === Template Cache ===
24
25type TemplateCache = RwLock<HashMap<u128, Arc<Environment<'static>>>>;
26
27static TEMPLATE_CACHE: OnceLock<TemplateCache> = OnceLock::new();
28
29fn get_cache() -> &'static TemplateCache {
30    TEMPLATE_CACHE.get_or_init(|| RwLock::new(HashMap::new()))
31}
32
33fn hash_source_and_path(source: &str, base_dir: &std::path::Path) -> u128 {
34    let mut data = source.as_bytes().to_vec();
35    data.extend_from_slice(base_dir.to_string_lossy().as_bytes());
36    xxhash_rust::xxh3::xxh3_128(&data)
37}
38
39/// Compile template and insert into cache. Returns hash.
40fn compile_template(source: &str, base_dir: &std::path::Path) -> Result<u128, minijinja::Error> {
41    compile_template_with_loader(source, base_dir, path_loader(base_dir))
42}
43
44/// Compile template with a custom loader and insert into cache. Returns hash.
45fn compile_template_with_loader<F>(
46    source: &str,
47    base_dir: &std::path::Path,
48    loader: F,
49) -> Result<u128, minijinja::Error>
50where
51    F: Fn(&str) -> Result<Option<String>, minijinja::Error> + Send + Sync + 'static,
52{
53    let hash = hash_source_and_path(source, base_dir);
54
55    let mut cache = get_cache().write().unwrap();
56    if cache.contains_key(&hash) {
57        return Ok(hash);
58    }
59
60    let mut env = Environment::new();
61    env.set_auto_escape_callback(|_| AutoEscape::Html);
62    env.set_loader(loader);
63    env.add_template_owned("template".to_string(), source.to_string())?;
64    cache.insert(hash, Arc::new(env));
65    Ok(hash)
66}
67
68/// Get compiled template from cache by hash.
69fn get_compiled(hash: u128) -> Option<Arc<Environment<'static>>> {
70    get_cache().read().unwrap().get(&hash).map(Arc::clone)
71}
72
73/// Load the latest content for a store topic.
74#[cfg(feature = "cross-stream")]
75fn load_topic_content(store: &xs::store::Store, topic: &str) -> Option<String> {
76    let options = xs::store::ReadOptions::builder()
77        .follow(xs::store::FollowOption::Off)
78        .topic(topic.to_string())
79        .last(1_usize)
80        .build();
81    let frame = store.read_sync(options).last()?;
82    let hash = frame.hash?;
83    let bytes = store.cas_read_sync(&hash).ok()?;
84    String::from_utf8(bytes).ok()
85}
86
87// === CompiledTemplate CustomValue ===
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct CompiledTemplate {
91    hash: u128,
92}
93
94impl CompiledTemplate {
95    /// Render this template with the given context
96    pub fn render(&self, context: &minijinja::Value) -> Result<String, minijinja::Error> {
97        let env = get_compiled(self.hash).expect("template not in cache");
98        let tmpl = env.get_template("template")?;
99        tmpl.render(context)
100    }
101}
102
103#[typetag::serde]
104impl CustomValue for CompiledTemplate {
105    fn clone_value(&self, span: Span) -> Value {
106        Value::custom(Box::new(self.clone()), span)
107    }
108
109    fn type_name(&self) -> String {
110        "CompiledTemplate".into()
111    }
112
113    fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
114        Ok(Value::string(format!("{:032x}", self.hash), span))
115    }
116
117    fn as_any(&self) -> &dyn std::any::Any {
118        self
119    }
120
121    fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
122        self
123    }
124}
125
126thread_local! {
127    pub static RESPONSE_TX: RefCell<Option<oneshot::Sender<Response>>> = const { RefCell::new(None) };
128}
129
130#[derive(Clone)]
131pub struct StaticCommand;
132
133impl Default for StaticCommand {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl StaticCommand {
140    pub fn new() -> Self {
141        Self
142    }
143}
144
145impl Command for StaticCommand {
146    fn name(&self) -> &str {
147        ".static"
148    }
149
150    fn description(&self) -> &str {
151        "Serve static files from a directory"
152    }
153
154    fn signature(&self) -> Signature {
155        Signature::build(".static")
156            .required("root", SyntaxShape::String, "root directory path")
157            .required("path", SyntaxShape::String, "request path")
158            .named(
159                "fallback",
160                SyntaxShape::String,
161                "fallback file when request missing",
162                None,
163            )
164            .input_output_types(vec![(Type::Nothing, Type::Nothing)])
165            .category(Category::Custom("http".into()))
166    }
167
168    fn run(
169        &self,
170        engine_state: &EngineState,
171        stack: &mut Stack,
172        call: &Call,
173        _input: PipelineData,
174    ) -> Result<PipelineData, ShellError> {
175        let root: String = call.req(engine_state, stack, 0)?;
176        let path: String = call.req(engine_state, stack, 1)?;
177
178        let fallback: Option<String> = call.get_flag(engine_state, stack, "fallback")?;
179
180        let response = Response {
181            status: 200,
182            headers: HashMap::new(),
183            body_type: ResponseBodyType::Static {
184                root: PathBuf::from(root),
185                path,
186                fallback,
187            },
188        };
189
190        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
191            if let Some(tx) = tx.borrow_mut().take() {
192                tx.send(response).map_err(|_| {
193                    ShellError::Generic(GenericError::new(
194                        "Failed to send response",
195                        "Channel closed",
196                        call.head,
197                    ))
198                })?;
199            }
200            Ok(())
201        })?;
202
203        Ok(PipelineData::Empty)
204    }
205}
206
207const LINE_ENDING: &str = "\n";
208
209#[derive(Clone)]
210pub struct ToSse;
211
212impl Command for ToSse {
213    fn name(&self) -> &str {
214        "to sse"
215    }
216
217    fn signature(&self) -> Signature {
218        Signature::build("to sse")
219            .input_output_types(vec![
220                (Type::record(), Type::String),
221                (Type::List(Box::new(Type::record())), Type::String),
222            ])
223            .category(Category::Formats)
224    }
225
226    fn description(&self) -> &str {
227        "Convert records into text/event-stream format"
228    }
229
230    fn search_terms(&self) -> Vec<&str> {
231        vec!["sse", "server", "event"]
232    }
233
234    fn examples(&self) -> Vec<Example<'_>> {
235        vec![Example {
236            description: "Convert a record into a server-sent event",
237            example: "{data: 'hello'} | to sse",
238            result: Some(Value::test_string("data: hello\n\n")),
239        }]
240    }
241
242    fn run(
243        &self,
244        engine_state: &EngineState,
245        stack: &mut Stack,
246        call: &Call,
247        input: PipelineData,
248    ) -> Result<PipelineData, ShellError> {
249        let head = call.head;
250        let config = stack.get_config(engine_state);
251        match input {
252            PipelineData::ListStream(stream, meta) => {
253                let span = stream.span();
254                let cfg = config.clone();
255                let iter = stream
256                    .into_iter()
257                    .map(move |val| event_to_string(&cfg, val));
258                let stream = ByteStream::from_result_iter(
259                    iter,
260                    span,
261                    engine_state.signals().clone(),
262                    ByteStreamType::String,
263                );
264                Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
265            }
266            PipelineData::Value(Value::List { vals, .. }, meta) => {
267                let cfg = config.clone();
268                let iter = vals.into_iter().map(move |val| event_to_string(&cfg, val));
269                let span = head;
270                let stream = ByteStream::from_result_iter(
271                    iter,
272                    span,
273                    engine_state.signals().clone(),
274                    ByteStreamType::String,
275                );
276                Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
277            }
278            PipelineData::Value(val, meta) => {
279                let out = event_to_string(&config, val)?;
280                Ok(
281                    Value::string(out, head)
282                        .into_pipeline_data_with_metadata(update_metadata(meta)),
283                )
284            }
285            PipelineData::Empty => Ok(PipelineData::Value(
286                Value::string(String::new(), head),
287                update_metadata(None),
288            )),
289            PipelineData::ByteStream(..) => Err(ShellError::TypeMismatch {
290                err_message: "expected record input".into(),
291                span: head,
292            }),
293        }
294    }
295}
296
297fn emit_data_lines(out: &mut String, s: &str) {
298    for line in s.lines() {
299        out.push_str("data: ");
300        out.push_str(line);
301        out.push_str(LINE_ENDING);
302    }
303}
304
305#[allow(clippy::result_large_err)]
306fn value_to_data_string(val: &Value, config: &Config) -> Result<String, ShellError> {
307    match val {
308        Value::String { val, .. } => Ok(val.clone()),
309        _ => {
310            let json_value = value_to_json(val, config).map_err(|err| {
311                ShellError::Generic(GenericError::new(
312                    err.to_string(),
313                    "failed to serialize json",
314                    Span::unknown(),
315                ))
316            })?;
317            serde_json::to_string(&json_value).map_err(|err| {
318                ShellError::Generic(GenericError::new(
319                    err.to_string(),
320                    "failed to serialize json",
321                    Span::unknown(),
322                ))
323            })
324        }
325    }
326}
327
328#[allow(clippy::result_large_err)]
329fn event_to_string(config: &Config, val: Value) -> Result<String, ShellError> {
330    let span = val.span();
331    let rec = match val {
332        Value::Record { val, .. } => val,
333        // Propagate the original error instead of creating a new "expected record" error
334        Value::Error { error, .. } => return Err(*error),
335        other => {
336            return Err(ShellError::TypeMismatch {
337                err_message: format!("expected record, got {}", other.get_type()),
338                span,
339            })
340        }
341    };
342    let mut out = String::new();
343    // A `comment` field emits an SSE comment line (`: text`). Used for idle
344    // keepalives; the browser ignores it but proxies keep the stream open.
345    if let Some(comment) = rec.get("comment") {
346        if !matches!(comment, Value::Nothing { .. }) {
347            out.push_str(": ");
348            out.push_str(&comment.to_expanded_string("", config));
349            out.push_str(LINE_ENDING);
350        }
351    }
352    if let Some(event) = rec.get("event") {
353        if !matches!(event, Value::Nothing { .. }) {
354            out.push_str("event: ");
355            out.push_str(&event.to_expanded_string("", config));
356            out.push_str(LINE_ENDING);
357        }
358    }
359    if let Some(id) = rec.get("id") {
360        if !matches!(id, Value::Nothing { .. }) {
361            out.push_str("id: ");
362            out.push_str(&id.to_expanded_string("", config));
363            out.push_str(LINE_ENDING);
364        }
365    }
366    if let Some(retry) = rec.get("retry") {
367        if !matches!(retry, Value::Nothing { .. }) {
368            out.push_str("retry: ");
369            out.push_str(&retry.to_expanded_string("", config));
370            out.push_str(LINE_ENDING);
371        }
372    }
373    if let Some(data) = rec.get("data") {
374        if !matches!(data, Value::Nothing { .. }) {
375            match data {
376                Value::List { vals, .. } => {
377                    for item in vals {
378                        emit_data_lines(&mut out, &value_to_data_string(item, config)?);
379                    }
380                }
381                _ => {
382                    emit_data_lines(&mut out, &value_to_data_string(data, config)?);
383                }
384            }
385        }
386    }
387    out.push_str(LINE_ENDING);
388    Ok(out)
389}
390
391fn value_to_json(val: &Value, config: &Config) -> serde_json::Result<serde_json::Value> {
392    Ok(match val {
393        Value::Bool { val, .. } => serde_json::Value::Bool(*val),
394        Value::Int { val, .. } => serde_json::Value::from(*val),
395        Value::Float { val, .. } => serde_json::Number::from_f64(*val)
396            .map(serde_json::Value::Number)
397            .unwrap_or(serde_json::Value::Null),
398        Value::String { val, .. } => serde_json::Value::String(val.clone()),
399        Value::List { vals, .. } => serde_json::Value::Array(
400            vals.iter()
401                .map(|v| value_to_json(v, config))
402                .collect::<Result<Vec<_>, _>>()?,
403        ),
404        Value::Record { val, .. } => {
405            let mut map = serde_json::Map::new();
406            for (k, v) in val.iter() {
407                map.insert(k.clone(), value_to_json(v, config)?);
408            }
409            serde_json::Value::Object(map)
410        }
411        Value::Nothing { .. } => serde_json::Value::Null,
412        other => serde_json::Value::String(other.to_expanded_string("", config)),
413    })
414}
415
416fn update_metadata(metadata: Option<PipelineMetadata>) -> Option<PipelineMetadata> {
417    metadata
418        .map(|md| md.with_content_type(Some("text/event-stream".into())))
419        .or_else(|| {
420            Some(PipelineMetadata::default().with_content_type(Some("text/event-stream".into())))
421        })
422}
423
424#[derive(Clone)]
425pub struct ReverseProxyCommand;
426
427impl Default for ReverseProxyCommand {
428    fn default() -> Self {
429        Self::new()
430    }
431}
432
433impl ReverseProxyCommand {
434    pub fn new() -> Self {
435        Self
436    }
437}
438
439impl Command for ReverseProxyCommand {
440    fn name(&self) -> &str {
441        ".reverse-proxy"
442    }
443
444    fn description(&self) -> &str {
445        "Forward HTTP requests to a backend server"
446    }
447
448    fn signature(&self) -> Signature {
449        Signature::build(".reverse-proxy")
450            .required("target_url", SyntaxShape::String, "backend URL to proxy to")
451            .optional(
452                "config",
453                SyntaxShape::Record(vec![]),
454                "optional configuration (headers, preserve_host, strip_prefix, query)",
455            )
456            .input_output_types(vec![(Type::Any, Type::Nothing)])
457            .category(Category::Custom("http".into()))
458    }
459
460    fn run(
461        &self,
462        engine_state: &EngineState,
463        stack: &mut Stack,
464        call: &Call,
465        input: PipelineData,
466    ) -> Result<PipelineData, ShellError> {
467        let target_url: String = call.req(engine_state, stack, 0)?;
468
469        // Convert input pipeline data to bytes for request body
470        let request_body = match input {
471            PipelineData::Empty => Vec::new(),
472            PipelineData::Value(value, _) => crate::response::value_to_bytes(value),
473            PipelineData::ByteStream(stream, _) => {
474                // Collect all bytes from the stream
475                let mut body_bytes = Vec::new();
476                if let Some(mut reader) = stream.reader() {
477                    loop {
478                        let mut buffer = vec![0; 8192];
479                        match reader.read(&mut buffer) {
480                            Ok(0) => break, // EOF
481                            Ok(n) => {
482                                buffer.truncate(n);
483                                body_bytes.extend_from_slice(&buffer);
484                            }
485                            Err(_) => break,
486                        }
487                    }
488                }
489                body_bytes
490            }
491            PipelineData::ListStream(stream, _) => {
492                // Convert list stream to JSON array
493                let items: Vec<_> = stream.into_iter().collect();
494                let json_value = serde_json::Value::Array(
495                    items
496                        .into_iter()
497                        .map(|v| crate::response::value_to_json(&v))
498                        .collect(),
499                );
500                serde_json::to_string(&json_value)
501                    .unwrap_or_default()
502                    .into_bytes()
503            }
504        };
505
506        // Parse optional config
507        let config = call.opt::<Value>(engine_state, stack, 1);
508
509        let mut headers = HashMap::new();
510        let mut preserve_host = true;
511        let mut strip_prefix: Option<String> = None;
512        let mut query: Option<HashMap<String, String>> = None;
513
514        if let Ok(Some(config_value)) = config {
515            if let Ok(record) = config_value.as_record() {
516                // Extract headers
517                if let Some(headers_value) = record.get("headers") {
518                    if let Ok(headers_record) = headers_value.as_record() {
519                        for (k, v) in headers_record.iter() {
520                            let header_value = match v {
521                                Value::String { val, .. } => {
522                                    crate::response::HeaderValue::Single(val.clone())
523                                }
524                                Value::List { vals, .. } => {
525                                    let strings: Vec<String> = vals
526                                        .iter()
527                                        .filter_map(|v| v.as_str().ok())
528                                        .map(|s| s.to_string())
529                                        .collect();
530                                    crate::response::HeaderValue::Multiple(strings)
531                                }
532                                _ => continue, // Skip non-string/non-list values
533                            };
534                            headers.insert(k.clone(), header_value);
535                        }
536                    }
537                }
538
539                // Extract preserve_host
540                if let Some(preserve_host_value) = record.get("preserve_host") {
541                    if let Ok(ph) = preserve_host_value.as_bool() {
542                        preserve_host = ph;
543                    }
544                }
545
546                // Extract strip_prefix
547                if let Some(strip_prefix_value) = record.get("strip_prefix") {
548                    if let Ok(prefix) = strip_prefix_value.as_str() {
549                        strip_prefix = Some(prefix.to_string());
550                    }
551                }
552
553                // Extract query
554                if let Some(query_value) = record.get("query") {
555                    if let Ok(query_record) = query_value.as_record() {
556                        let mut query_map = HashMap::new();
557                        for (k, v) in query_record.iter() {
558                            if let Ok(v_str) = v.as_str() {
559                                query_map.insert(k.clone(), v_str.to_string());
560                            }
561                        }
562                        query = Some(query_map);
563                    }
564                }
565            }
566        }
567
568        let response = Response {
569            status: 200,
570            headers: HashMap::new(),
571            body_type: ResponseBodyType::ReverseProxy {
572                target_url,
573                headers,
574                preserve_host,
575                strip_prefix,
576                request_body,
577                query,
578            },
579        };
580
581        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
582            if let Some(tx) = tx.borrow_mut().take() {
583                tx.send(response).map_err(|_| {
584                    ShellError::Generic(GenericError::new(
585                        "Failed to send response",
586                        "Channel closed",
587                        call.head,
588                    ))
589                })?;
590            }
591            Ok(())
592        })?;
593
594        Ok(PipelineData::Empty)
595    }
596}
597
598#[derive(Clone)]
599pub struct MjCommand {
600    #[cfg(feature = "cross-stream")]
601    store: Option<xs::store::Store>,
602}
603
604impl Default for MjCommand {
605    fn default() -> Self {
606        Self::new()
607    }
608}
609
610impl MjCommand {
611    pub fn new() -> Self {
612        Self {
613            #[cfg(feature = "cross-stream")]
614            store: None,
615        }
616    }
617
618    #[cfg(feature = "cross-stream")]
619    pub fn with_store(store: xs::store::Store) -> Self {
620        Self { store: Some(store) }
621    }
622}
623
624impl Command for MjCommand {
625    fn name(&self) -> &str {
626        ".mj"
627    }
628
629    fn description(&self) -> &str {
630        "Render a minijinja template with context from input"
631    }
632
633    fn signature(&self) -> Signature {
634        Signature::build(".mj")
635            .optional("file", SyntaxShape::String, "template file path")
636            .named(
637                "inline",
638                SyntaxShape::String,
639                "inline template string",
640                Some('i'),
641            )
642            .named(
643                "topic",
644                SyntaxShape::String,
645                "load template from a store topic",
646                Some('t'),
647            )
648            .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
649            .category(Category::Custom("http".into()))
650    }
651
652    fn run(
653        &self,
654        engine_state: &EngineState,
655        stack: &mut Stack,
656        call: &Call,
657        input: PipelineData,
658    ) -> Result<PipelineData, ShellError> {
659        let head = call.head;
660        let file: Option<String> = call.opt(engine_state, stack, 0)?;
661        let inline: Option<String> = call.get_flag(engine_state, stack, "inline")?;
662        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
663
664        let mode_count = file.is_some() as u8 + inline.is_some() as u8 + topic.is_some() as u8;
665        if mode_count > 1 {
666            return Err(ShellError::Generic(GenericError::new(
667                "Cannot combine file, --inline, and --topic",
668                "use exactly one of: file path, --inline, or --topic",
669                head,
670            )));
671        }
672        if mode_count == 0 {
673            return Err(ShellError::Generic(GenericError::new(
674                "No template specified",
675                "provide a file path, --inline, or --topic",
676                head,
677            )));
678        }
679
680        // Get context from input
681        let context = match input {
682            PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
683            PipelineData::Empty => minijinja::Value::from(()),
684            _ => {
685                return Err(ShellError::TypeMismatch {
686                    err_message: "expected record input".into(),
687                    span: head,
688                });
689            }
690        };
691
692        // Set up environment and get template
693        let mut env = Environment::new();
694        env.set_auto_escape_callback(|_| AutoEscape::Html);
695        let tmpl = if let Some(ref path) = file {
696            // File mode: resolve from filesystem only
697            let path = std::path::Path::new(path);
698            let abs_path = if path.is_absolute() {
699                path.to_path_buf()
700            } else {
701                std::env::current_dir().unwrap_or_default().join(path)
702            };
703            if let Some(parent) = abs_path.parent() {
704                env.set_loader(path_loader(parent));
705            }
706            let name = abs_path
707                .file_name()
708                .and_then(|n| n.to_str())
709                .unwrap_or("template");
710            env.get_template(name).map_err(|e| {
711                ShellError::Generic(GenericError::new(
712                    format!("Template error: {e}"),
713                    e.to_string(),
714                    head,
715                ))
716            })?
717        } else if let Some(ref topic_name) = topic {
718            // Topic mode: resolve from store only
719            #[cfg(feature = "cross-stream")]
720            {
721                let store = self.store.as_ref().ok_or_else(|| {
722                    ShellError::Generic(GenericError::new(
723                        "--topic requires --store",
724                        "server must be started with --store to use --topic",
725                        head,
726                    ))
727                })?;
728                let source = load_topic_content(store, topic_name).ok_or_else(|| {
729                    ShellError::Generic(GenericError::new(
730                        format!("Topic not found: {topic_name}"),
731                        "no content in store for this topic",
732                        head,
733                    ))
734                })?;
735                let topic_store = store.clone();
736                env.set_loader(move |name: &str| Ok(load_topic_content(&topic_store, name)));
737                env.add_template_owned(topic_name.clone(), source)
738                    .map_err(|e| {
739                        ShellError::Generic(GenericError::new(
740                            format!("Template parse error: {e}"),
741                            e.to_string(),
742                            head,
743                        ))
744                    })?;
745                env.get_template(topic_name).map_err(|e| {
746                    ShellError::Generic(GenericError::new(
747                        format!("Template error: {e}"),
748                        e.to_string(),
749                        head,
750                    ))
751                })?
752            }
753            #[cfg(not(feature = "cross-stream"))]
754            {
755                let _ = topic_name;
756                return Err(ShellError::Generic(GenericError::new(
757                    "--topic requires cross-stream feature",
758                    "built without store support",
759                    head,
760                )));
761            }
762        } else {
763            // Inline mode: self-contained, no loader
764            let source = inline.unwrap();
765            env.add_template_owned("template".to_string(), source)
766                .map_err(|e| {
767                    ShellError::Generic(GenericError::new(
768                        format!("Template parse error: {e}"),
769                        e.to_string(),
770                        head,
771                    ))
772                })?;
773            env.get_template("template").map_err(|e| {
774                ShellError::Generic(GenericError::new(
775                    format!("Failed to get template: {e}"),
776                    e.to_string(),
777                    head,
778                ))
779            })?
780        };
781
782        let rendered = tmpl.render(&context).map_err(|e| {
783            ShellError::Generic(GenericError::new(
784                format!("Template render error: {e}"),
785                e.to_string(),
786                head,
787            ))
788        })?;
789
790        Ok(Value::string(rendered, head).into_pipeline_data())
791    }
792}
793
794/// Convert a nu_protocol::Value to a minijinja::Value via serde_json
795fn nu_value_to_minijinja(val: &Value) -> minijinja::Value {
796    let json = value_to_json(val, &Config::default()).unwrap_or(serde_json::Value::Null);
797    minijinja::Value::from_serialize(&json)
798}
799
800// === .mj compile ===
801
802#[derive(Clone)]
803pub struct MjCompileCommand {
804    #[cfg(feature = "cross-stream")]
805    store: Option<xs::store::Store>,
806}
807
808impl Default for MjCompileCommand {
809    fn default() -> Self {
810        Self::new()
811    }
812}
813
814impl MjCompileCommand {
815    pub fn new() -> Self {
816        Self {
817            #[cfg(feature = "cross-stream")]
818            store: None,
819        }
820    }
821
822    #[cfg(feature = "cross-stream")]
823    pub fn with_store(store: xs::store::Store) -> Self {
824        Self { store: Some(store) }
825    }
826}
827
828impl Command for MjCompileCommand {
829    fn name(&self) -> &str {
830        ".mj compile"
831    }
832
833    fn description(&self) -> &str {
834        "Compile a minijinja template, returning a reusable compiled template"
835    }
836
837    fn signature(&self) -> Signature {
838        Signature::build(".mj compile")
839            .optional("file", SyntaxShape::String, "template file path")
840            .named(
841                "inline",
842                SyntaxShape::Any,
843                "inline template (string or {__html: string})",
844                Some('i'),
845            )
846            .named(
847                "topic",
848                SyntaxShape::String,
849                "load template from a store topic",
850                Some('t'),
851            )
852            .input_output_types(vec![(
853                Type::Nothing,
854                Type::Custom("CompiledTemplate".into()),
855            )])
856            .category(Category::Custom("http".into()))
857    }
858
859    fn run(
860        &self,
861        engine_state: &EngineState,
862        stack: &mut Stack,
863        call: &Call,
864        _input: PipelineData,
865    ) -> Result<PipelineData, ShellError> {
866        let head = call.head;
867        let file: Option<String> = call.opt(engine_state, stack, 0)?;
868        let inline: Option<Value> = call.get_flag(engine_state, stack, "inline")?;
869        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
870
871        // Extract template string from --inline value (string or {__html: string})
872        let inline_str: Option<String> = match &inline {
873            None => None,
874            Some(val) => match val {
875                Value::String { val, .. } => Some(val.clone()),
876                Value::Record { val, .. } => {
877                    if let Some(html_val) = val.get("__html") {
878                        match html_val {
879                            Value::String { val, .. } => Some(val.clone()),
880                            _ => {
881                                return Err(ShellError::Generic(GenericError::new(
882                                    "__html must be a string",
883                                    "expected string value",
884                                    head,
885                                )));
886                            }
887                        }
888                    } else {
889                        return Err(ShellError::Generic(GenericError::new(
890                            "Record must have __html field",
891                            "expected {__html: string}",
892                            head,
893                        )));
894                    }
895                }
896                _ => {
897                    return Err(ShellError::Generic(GenericError::new(
898                        "--inline must be string or {__html: string}",
899                        "invalid type",
900                        head,
901                    )));
902                }
903            },
904        };
905
906        let mode_count = file.is_some() as u8 + inline_str.is_some() as u8 + topic.is_some() as u8;
907        if mode_count > 1 {
908            return Err(ShellError::Generic(GenericError::new(
909                "Cannot combine file, --inline, and --topic",
910                "use exactly one of: file path, --inline, or --topic",
911                head,
912            )));
913        }
914        if mode_count == 0 {
915            return Err(ShellError::Generic(GenericError::new(
916                "No template specified",
917                "provide a file path, --inline, or --topic",
918                head,
919            )));
920        }
921
922        let hash = if let Some(ref path) = file {
923            // File mode: filesystem only
924            let path = std::path::Path::new(path);
925            let abs_path = if path.is_absolute() {
926                path.to_path_buf()
927            } else {
928                std::env::current_dir().unwrap_or_default().join(path)
929            };
930            let base_dir = abs_path.parent().unwrap_or(&abs_path).to_path_buf();
931            let source = std::fs::read_to_string(&abs_path).map_err(|e| {
932                ShellError::Generic(GenericError::new(
933                    format!("Failed to read template file: {e}"),
934                    "could not read file",
935                    head,
936                ))
937            })?;
938            compile_template(&source, &base_dir)
939        } else if let Some(ref topic_name) = topic {
940            // Topic mode: store only
941            #[cfg(feature = "cross-stream")]
942            {
943                let store = self.store.as_ref().ok_or_else(|| {
944                    ShellError::Generic(GenericError::new(
945                        "--topic requires --store",
946                        "server must be started with --store to use --topic",
947                        head,
948                    ))
949                })?;
950                let source = load_topic_content(store, topic_name).ok_or_else(|| {
951                    ShellError::Generic(GenericError::new(
952                        format!("Topic not found: {topic_name}"),
953                        "no content in store for this topic",
954                        head,
955                    ))
956                })?;
957                let topic_store = store.clone();
958                // Use a synthetic base_dir for cache key uniqueness
959                let base_dir = std::path::PathBuf::from(format!("__topic__/{topic_name}"));
960                compile_template_with_loader(&source, &base_dir, move |name: &str| {
961                    Ok(load_topic_content(&topic_store, name))
962                })
963            }
964            #[cfg(not(feature = "cross-stream"))]
965            {
966                let _ = topic_name;
967                return Err(ShellError::Generic(GenericError::new(
968                    "--topic requires cross-stream feature",
969                    "built without store support",
970                    head,
971                )));
972            }
973        } else {
974            // Inline mode: self-contained, no loader
975            let source = inline_str.unwrap();
976            let base_dir = std::path::PathBuf::from("__inline__");
977            compile_template_with_loader(&source, &base_dir, |_| Ok(None))
978        };
979
980        let hash = hash.map_err(|e| {
981            ShellError::Generic(GenericError::new(
982                format!("Template compile error: {e}"),
983                e.to_string(),
984                head,
985            ))
986        })?;
987
988        Ok(Value::custom(Box::new(CompiledTemplate { hash }), head).into_pipeline_data())
989    }
990}
991
992// === .mj render ===
993
994#[derive(Clone)]
995pub struct MjRenderCommand;
996
997impl Default for MjRenderCommand {
998    fn default() -> Self {
999        Self::new()
1000    }
1001}
1002
1003impl MjRenderCommand {
1004    pub fn new() -> Self {
1005        Self
1006    }
1007}
1008
1009impl Command for MjRenderCommand {
1010    fn name(&self) -> &str {
1011        ".mj render"
1012    }
1013
1014    fn description(&self) -> &str {
1015        "Render a compiled minijinja template with context from input"
1016    }
1017
1018    fn signature(&self) -> Signature {
1019        Signature::build(".mj render")
1020            .required(
1021                "template",
1022                SyntaxShape::Any,
1023                "compiled template from '.mj compile'",
1024            )
1025            .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
1026            .category(Category::Custom("http".into()))
1027    }
1028
1029    fn run(
1030        &self,
1031        engine_state: &EngineState,
1032        stack: &mut Stack,
1033        call: &Call,
1034        input: PipelineData,
1035    ) -> Result<PipelineData, ShellError> {
1036        let head = call.head;
1037        let template_val: Value = call.req(engine_state, stack, 0)?;
1038
1039        // Extract CompiledTemplate from the value
1040        let compiled = match template_val {
1041            Value::Custom { val, .. } => val
1042                .as_any()
1043                .downcast_ref::<CompiledTemplate>()
1044                .ok_or_else(|| ShellError::TypeMismatch {
1045                    err_message: "expected CompiledTemplate".into(),
1046                    span: head,
1047                })?
1048                .clone(),
1049            _ => {
1050                return Err(ShellError::TypeMismatch {
1051                    err_message: "expected CompiledTemplate from '.mj compile'".into(),
1052                    span: head,
1053                });
1054            }
1055        };
1056
1057        // Get context from input
1058        let context = match input {
1059            PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
1060            PipelineData::Empty => minijinja::Value::from(()),
1061            _ => {
1062                return Err(ShellError::TypeMismatch {
1063                    err_message: "expected record input".into(),
1064                    span: head,
1065                });
1066            }
1067        };
1068
1069        // Render template
1070        let rendered = compiled.render(&context).map_err(|e| {
1071            ShellError::Generic(GenericError::new(
1072                format!("Template render error: {e}"),
1073                e.to_string(),
1074                head,
1075            ))
1076        })?;
1077
1078        Ok(Value::string(rendered, head).into_pipeline_data())
1079    }
1080}
1081
1082// === Syntax Highlighting ===
1083
1084struct SyntaxHighlighter {
1085    syntax_set: SyntaxSet,
1086}
1087
1088impl SyntaxHighlighter {
1089    fn new() -> Self {
1090        const SYNTAX_SET: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/syntax_set.bin"));
1091        let syntax_set = syntect::dumps::from_binary(SYNTAX_SET);
1092        Self { syntax_set }
1093    }
1094
1095    fn highlight(&self, code: &str, lang: Option<&str>) -> String {
1096        let syntax = match lang {
1097            Some(lang) => self
1098                .syntax_set
1099                .find_syntax_by_token(lang)
1100                .or_else(|| self.syntax_set.find_syntax_by_extension(lang)),
1101            None => None,
1102        }
1103        .unwrap_or_else(|| self.syntax_set.find_syntax_plain_text());
1104
1105        let mut html_generator = ClassedHTMLGenerator::new_with_class_style(
1106            syntax,
1107            &self.syntax_set,
1108            ClassStyle::Spaced,
1109        );
1110
1111        for line in LinesWithEndings::from(code) {
1112            let _ = html_generator.parse_html_for_line_which_includes_newline(line);
1113        }
1114
1115        html_generator.finalize()
1116    }
1117
1118    fn list_syntaxes(&self) -> Vec<(String, Vec<String>)> {
1119        self.syntax_set
1120            .syntaxes()
1121            .iter()
1122            .map(|s| (s.name.clone(), s.file_extensions.clone()))
1123            .collect()
1124    }
1125}
1126
1127static HIGHLIGHTER: OnceLock<SyntaxHighlighter> = OnceLock::new();
1128
1129fn get_highlighter() -> &'static SyntaxHighlighter {
1130    HIGHLIGHTER.get_or_init(SyntaxHighlighter::new)
1131}
1132
1133// === .highlight command ===
1134
1135#[derive(Clone)]
1136pub struct HighlightCommand;
1137
1138impl Default for HighlightCommand {
1139    fn default() -> Self {
1140        Self::new()
1141    }
1142}
1143
1144impl HighlightCommand {
1145    pub fn new() -> Self {
1146        Self
1147    }
1148}
1149
1150impl Command for HighlightCommand {
1151    fn name(&self) -> &str {
1152        ".highlight"
1153    }
1154
1155    fn description(&self) -> &str {
1156        "Syntax highlight code, outputting HTML with CSS classes"
1157    }
1158
1159    fn signature(&self) -> Signature {
1160        Signature::build(".highlight")
1161            .required("lang", SyntaxShape::String, "language for highlighting")
1162            .input_output_types(vec![(Type::String, Type::record())])
1163            .category(Category::Custom("http".into()))
1164    }
1165
1166    fn run(
1167        &self,
1168        engine_state: &EngineState,
1169        stack: &mut Stack,
1170        call: &Call,
1171        input: PipelineData,
1172    ) -> Result<PipelineData, ShellError> {
1173        let head = call.head;
1174        let lang: String = call.req(engine_state, stack, 0)?;
1175
1176        let code = match input {
1177            PipelineData::Value(Value::String { val, .. }, _) => val,
1178            PipelineData::ByteStream(stream, _) => stream.into_string()?,
1179            _ => {
1180                return Err(ShellError::TypeMismatch {
1181                    err_message: "expected string input".into(),
1182                    span: head,
1183                });
1184            }
1185        };
1186
1187        let highlighter = get_highlighter();
1188        let html = highlighter.highlight(&code, Some(&lang));
1189
1190        Ok(Value::record(
1191            nu_protocol::record! {
1192                "__html" => Value::string(html, head),
1193            },
1194            head,
1195        )
1196        .into_pipeline_data())
1197    }
1198}
1199
1200// === .highlight theme command ===
1201
1202#[derive(Clone)]
1203pub struct HighlightThemeCommand;
1204
1205impl Default for HighlightThemeCommand {
1206    fn default() -> Self {
1207        Self::new()
1208    }
1209}
1210
1211impl HighlightThemeCommand {
1212    pub fn new() -> Self {
1213        Self
1214    }
1215}
1216
1217impl Command for HighlightThemeCommand {
1218    fn name(&self) -> &str {
1219        ".highlight theme"
1220    }
1221
1222    fn description(&self) -> &str {
1223        "List available themes or get CSS for a specific theme"
1224    }
1225
1226    fn signature(&self) -> Signature {
1227        Signature::build(".highlight theme")
1228            .optional("name", SyntaxShape::String, "theme name (omit to list all)")
1229            .input_output_types(vec![
1230                (Type::Nothing, Type::List(Box::new(Type::String))),
1231                (Type::Nothing, Type::String),
1232            ])
1233            .category(Category::Custom("http".into()))
1234    }
1235
1236    fn run(
1237        &self,
1238        engine_state: &EngineState,
1239        stack: &mut Stack,
1240        call: &Call,
1241        _input: PipelineData,
1242    ) -> Result<PipelineData, ShellError> {
1243        let head = call.head;
1244        let name: Option<String> = call.opt(engine_state, stack, 0)?;
1245
1246        let assets = syntect_assets::assets::HighlightingAssets::from_binary();
1247
1248        match name {
1249            None => {
1250                let themes: Vec<Value> = assets.themes().map(|t| Value::string(t, head)).collect();
1251                Ok(Value::list(themes, head).into_pipeline_data())
1252            }
1253            Some(theme_name) => {
1254                let theme = assets.get_theme(&theme_name);
1255                let css = syntect::html::css_for_theme_with_class_style(theme, ClassStyle::Spaced)
1256                    .map_err(|e| {
1257                        ShellError::Generic(GenericError::new(
1258                            format!("Failed to generate CSS: {e}"),
1259                            e.to_string(),
1260                            head,
1261                        ))
1262                    })?;
1263                Ok(Value::string(css, head).into_pipeline_data())
1264            }
1265        }
1266    }
1267}
1268
1269// === .highlight lang command ===
1270
1271#[derive(Clone)]
1272pub struct HighlightLangCommand;
1273
1274impl Default for HighlightLangCommand {
1275    fn default() -> Self {
1276        Self::new()
1277    }
1278}
1279
1280impl HighlightLangCommand {
1281    pub fn new() -> Self {
1282        Self
1283    }
1284}
1285
1286impl Command for HighlightLangCommand {
1287    fn name(&self) -> &str {
1288        ".highlight lang"
1289    }
1290
1291    fn description(&self) -> &str {
1292        "List available languages for syntax highlighting"
1293    }
1294
1295    fn signature(&self) -> Signature {
1296        Signature::build(".highlight lang")
1297            .input_output_types(vec![(Type::Nothing, Type::List(Box::new(Type::record())))])
1298            .category(Category::Custom("http".into()))
1299    }
1300
1301    fn run(
1302        &self,
1303        _engine_state: &EngineState,
1304        _stack: &mut Stack,
1305        call: &Call,
1306        _input: PipelineData,
1307    ) -> Result<PipelineData, ShellError> {
1308        let head = call.head;
1309        let highlighter = get_highlighter();
1310        let langs: Vec<Value> = highlighter
1311            .list_syntaxes()
1312            .into_iter()
1313            .map(|(name, exts)| {
1314                Value::record(
1315                    nu_protocol::record! {
1316                        "name" => Value::string(name, head),
1317                        "extensions" => Value::list(
1318                            exts.into_iter().map(|e| Value::string(e, head)).collect(),
1319                            head
1320                        ),
1321                    },
1322                    head,
1323                )
1324            })
1325            .collect();
1326        Ok(Value::list(langs, head).into_pipeline_data())
1327    }
1328}
1329
1330// === .md command ===
1331
1332use pulldown_cmark::{html, CodeBlockKind, Event, Parser as MarkdownParser, Tag, TagEnd};
1333
1334#[derive(Clone)]
1335pub struct MdCommand;
1336
1337impl Default for MdCommand {
1338    fn default() -> Self {
1339        Self::new()
1340    }
1341}
1342
1343impl MdCommand {
1344    pub fn new() -> Self {
1345        Self
1346    }
1347}
1348
1349impl Command for MdCommand {
1350    fn name(&self) -> &str {
1351        ".md"
1352    }
1353
1354    fn description(&self) -> &str {
1355        "Convert Markdown to HTML with syntax-highlighted code blocks"
1356    }
1357
1358    fn signature(&self) -> Signature {
1359        Signature::build(".md")
1360            .input_output_types(vec![
1361                (Type::String, Type::record()),
1362                (Type::record(), Type::record()),
1363            ])
1364            .category(Category::Custom("http".into()))
1365    }
1366
1367    fn run(
1368        &self,
1369        _engine_state: &EngineState,
1370        _stack: &mut Stack,
1371        call: &Call,
1372        input: PipelineData,
1373    ) -> Result<PipelineData, ShellError> {
1374        let head = call.head;
1375
1376        // Determine if input is trusted ({__html: ...}) or untrusted (plain string)
1377        let (markdown, trusted) = match input.into_value(head)? {
1378            Value::String { val, .. } => (val, false),
1379            Value::Record { val, .. } => {
1380                if let Some(html_val) = val.get("__html") {
1381                    (html_val.as_str()?.to_string(), true)
1382                } else {
1383                    return Err(ShellError::TypeMismatch {
1384                        err_message: "expected string or {__html: ...}".into(),
1385                        span: head,
1386                    });
1387                }
1388            }
1389            other => {
1390                return Err(ShellError::TypeMismatch {
1391                    err_message: format!(
1392                        "expected string or {{__html: ...}}, got {}",
1393                        other.get_type()
1394                    ),
1395                    span: head,
1396                });
1397            }
1398        };
1399
1400        let highlighter = get_highlighter();
1401
1402        let mut in_code_block = false;
1403        let mut current_code = String::new();
1404        let mut current_lang: Option<String> = None;
1405
1406        let mut options = pulldown_cmark::Options::empty();
1407        options.insert(pulldown_cmark::Options::ENABLE_TABLES);
1408        options.insert(pulldown_cmark::Options::ENABLE_STRIKETHROUGH);
1409        options.insert(pulldown_cmark::Options::ENABLE_TASKLISTS);
1410        options.insert(pulldown_cmark::Options::ENABLE_FOOTNOTES);
1411        options.insert(pulldown_cmark::Options::ENABLE_HEADING_ATTRIBUTES);
1412        options.insert(pulldown_cmark::Options::ENABLE_GFM);
1413        options.insert(pulldown_cmark::Options::ENABLE_DEFINITION_LIST);
1414        let parser = MarkdownParser::new_ext(&markdown, options).map(|event| match event {
1415            Event::Start(Tag::CodeBlock(kind)) => {
1416                in_code_block = true;
1417                current_code.clear();
1418                current_lang = match kind {
1419                    CodeBlockKind::Fenced(info) => {
1420                        let lang = info.split_whitespace().next().unwrap_or("");
1421                        if lang.is_empty() {
1422                            None
1423                        } else {
1424                            Some(lang.to_string())
1425                        }
1426                    }
1427                    CodeBlockKind::Indented => None,
1428                };
1429                Event::Text("".into())
1430            }
1431            Event::End(TagEnd::CodeBlock) => {
1432                in_code_block = false;
1433                let highlighted = highlighter.highlight(&current_code, current_lang.as_deref());
1434                let mut html_out = String::new();
1435                html_out.push_str("<pre><code");
1436                if let Some(lang) = &current_lang {
1437                    let lang = v_htmlescape::escape(lang);
1438                    html_out.push_str(&format!(" class=\"language-{lang}\""));
1439                }
1440                html_out.push('>');
1441                html_out.push_str(&highlighted);
1442                html_out.push_str("</code></pre>");
1443                Event::Html(html_out.into())
1444            }
1445            Event::Text(text) => {
1446                if in_code_block {
1447                    current_code.push_str(&text);
1448                    Event::Text("".into())
1449                } else {
1450                    Event::Text(text)
1451                }
1452            }
1453            // Escape raw HTML if input is untrusted
1454            Event::Html(html) => {
1455                if trusted {
1456                    Event::Html(html)
1457                } else {
1458                    Event::Text(html) // push_html escapes Text
1459                }
1460            }
1461            Event::InlineHtml(html) => {
1462                if trusted {
1463                    Event::InlineHtml(html)
1464                } else {
1465                    Event::Text(html)
1466                }
1467            }
1468            e => e,
1469        });
1470
1471        let mut html_output = String::new();
1472        html::push_html(&mut html_output, parser);
1473
1474        Ok(Value::record(
1475            nu_protocol::record! {
1476                "__html" => Value::string(html_output, head),
1477            },
1478            head,
1479        )
1480        .into_pipeline_data())
1481    }
1482}
1483
1484// === .print command ===
1485
1486#[derive(Clone)]
1487pub struct PrintCommand;
1488
1489impl Default for PrintCommand {
1490    fn default() -> Self {
1491        Self::new()
1492    }
1493}
1494
1495impl PrintCommand {
1496    pub fn new() -> Self {
1497        Self
1498    }
1499}
1500
1501impl Command for PrintCommand {
1502    fn name(&self) -> &str {
1503        "print"
1504    }
1505
1506    fn description(&self) -> &str {
1507        "Print the given values to the http-nu logging system."
1508    }
1509
1510    fn extra_description(&self) -> &str {
1511        r#"This command outputs to http-nu's logging system rather than stdout/stderr.
1512Messages appear in both human-readable and JSONL output modes.
1513
1514`print` may be used inside blocks of code (e.g.: hooks) to display text during execution without interfering with the pipeline."#
1515    }
1516
1517    fn search_terms(&self) -> Vec<&str> {
1518        vec!["display"]
1519    }
1520
1521    fn signature(&self) -> Signature {
1522        Signature::build("print")
1523            .input_output_types(vec![
1524                (Type::Nothing, Type::Nothing),
1525                (Type::Any, Type::Nothing),
1526            ])
1527            .allow_variants_without_examples(true)
1528            .rest("rest", SyntaxShape::Any, "the values to print")
1529            .switch(
1530                "no-newline",
1531                "print without inserting a newline for the line ending",
1532                Some('n'),
1533            )
1534            .switch("stderr", "print to stderr instead of stdout", Some('e'))
1535            .switch(
1536                "raw",
1537                "print without formatting (including binary data)",
1538                Some('r'),
1539            )
1540            .category(Category::Strings)
1541    }
1542
1543    fn run(
1544        &self,
1545        engine_state: &EngineState,
1546        stack: &mut Stack,
1547        call: &Call,
1548        input: PipelineData,
1549    ) -> Result<PipelineData, ShellError> {
1550        let args: Vec<Value> = call.rest(engine_state, stack, 0)?;
1551        let no_newline = call.has_flag(engine_state, stack, "no-newline")?;
1552        let config = stack.get_config(engine_state);
1553
1554        let format_value = |val: &Value| -> String { val.to_expanded_string(" ", &config) };
1555
1556        if !args.is_empty() {
1557            let messages: Vec<String> = args.iter().map(format_value).collect();
1558            let message = if no_newline {
1559                messages.join("")
1560            } else {
1561                messages.join("\n")
1562            };
1563            log_print(&message);
1564        } else if !input.is_nothing() {
1565            let message = match input {
1566                PipelineData::Value(val, _) => format_value(&val),
1567                PipelineData::ListStream(stream, _) => {
1568                    let vals: Vec<String> = stream.into_iter().map(|v| format_value(&v)).collect();
1569                    vals.join("\n")
1570                }
1571                PipelineData::ByteStream(stream, _) => stream.into_string()?,
1572                PipelineData::Empty => String::new(),
1573            };
1574            if !message.is_empty() {
1575                log_print(&message);
1576            }
1577        }
1578
1579        Ok(PipelineData::empty())
1580    }
1581
1582    fn examples(&self) -> Vec<Example<'_>> {
1583        vec![
1584            Example {
1585                description: "Print 'hello world'",
1586                example: r#"print "hello world""#,
1587                result: None,
1588            },
1589            Example {
1590                description: "Print the sum of 2 and 3",
1591                example: r#"print (2 + 3)"#,
1592                result: None,
1593            },
1594        ]
1595    }
1596}
1597
1598// === .run: parse, compile, and evaluate a nushell pipeline string in a sandbox ===
1599
1600#[derive(Clone)]
1601pub struct RunNuCommand;
1602
1603impl Default for RunNuCommand {
1604    fn default() -> Self {
1605        Self::new()
1606    }
1607}
1608
1609impl RunNuCommand {
1610    pub fn new() -> Self {
1611        Self
1612    }
1613}
1614
1615impl Command for RunNuCommand {
1616    fn name(&self) -> &str {
1617        ".run"
1618    }
1619
1620    fn description(&self) -> &str {
1621        "Parse, compile, and evaluate a nushell pipeline string in a sandboxed engine state."
1622    }
1623
1624    fn extra_description(&self) -> &str {
1625        r#"The submitted script is parsed and compiled against a clone of the current engine state.
1626Any defs, lets, or modules introduced by the script live only for the duration of the call --
1627they do not leak back into the calling engine. Pipeline input is forwarded to the script as `$in`;
1628the caller's local bindings and environment are not visible inside the sandbox."#
1629    }
1630
1631    fn signature(&self) -> Signature {
1632        Signature::build(".run")
1633            .input_output_types(vec![(Type::Any, Type::Any)])
1634            .required("script", SyntaxShape::String, "nushell source to evaluate")
1635            .category(Category::Experimental)
1636    }
1637
1638    fn run(
1639        &self,
1640        engine_state: &EngineState,
1641        stack: &mut Stack,
1642        call: &Call,
1643        input: PipelineData,
1644    ) -> Result<PipelineData, ShellError> {
1645        use nu_engine::eval_block_with_early_return;
1646        use nu_parser::parse;
1647        use nu_protocol::{debugger::WithoutDebug, engine::StateWorkingSet, format_cli_error};
1648
1649        let script: String = call.req(engine_state, stack, 0)?;
1650
1651        let mut ws = StateWorkingSet::new(engine_state);
1652        let block = parse(&mut ws, Some("<.run>"), script.as_bytes(), false);
1653
1654        if !ws.parse_errors.is_empty() {
1655            let mut combined = String::new();
1656            for err in &ws.parse_errors {
1657                let se = ShellError::Generic(GenericError::new(
1658                    "Parse error",
1659                    format!("{err}"),
1660                    err.span(),
1661                ));
1662                combined.push_str(&format_cli_error(None, &ws, &se, None));
1663                combined.push('\n');
1664            }
1665            let plain = nu_utils::strip_ansi_string_likely(combined);
1666            return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1667        }
1668        if !ws.compile_errors.is_empty() {
1669            let mut combined = String::new();
1670            for err in &ws.compile_errors {
1671                let se = ShellError::Generic(GenericError::new_internal(
1672                    format!("Compile error {err}"),
1673                    "",
1674                ));
1675                combined.push_str(&format_cli_error(None, &ws, &se, None));
1676                combined.push('\n');
1677            }
1678            let plain = nu_utils::strip_ansi_string_likely(combined);
1679            return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1680        }
1681
1682        let mut sandbox = engine_state.clone();
1683        sandbox.merge_delta(ws.render())?;
1684
1685        let mut sub_stack = Stack::new();
1686        eval_block_with_early_return::<WithoutDebug>(&sandbox, &mut sub_stack, &block, input)
1687            .map(|exec| exec.body)
1688    }
1689}
1690
1691// === .bus pub / .bus sub: ephemeral local pub/sub ===
1692
1693#[derive(Clone)]
1694pub struct BusPubCommand {
1695    bus: Arc<Bus>,
1696}
1697
1698impl BusPubCommand {
1699    pub fn new(bus: Arc<Bus>) -> Self {
1700        Self { bus }
1701    }
1702}
1703
1704impl Command for BusPubCommand {
1705    fn name(&self) -> &str {
1706        ".bus pub"
1707    }
1708
1709    fn description(&self) -> &str {
1710        "Publish a value to an ephemeral in-process topic."
1711    }
1712
1713    fn signature(&self) -> Signature {
1714        Signature::build(".bus pub")
1715            .input_output_types(vec![(Type::Any, Type::Nothing)])
1716            .required("topic", SyntaxShape::String, "topic to publish to")
1717            .category(Category::Experimental)
1718    }
1719
1720    fn run(
1721        &self,
1722        engine_state: &EngineState,
1723        stack: &mut Stack,
1724        call: &Call,
1725        input: PipelineData,
1726    ) -> Result<PipelineData, ShellError> {
1727        let topic: String = call.req(engine_state, stack, 0)?;
1728        let value = input.into_value(call.head)?;
1729        self.bus.publish(topic, value);
1730        Ok(PipelineData::empty())
1731    }
1732}
1733
1734#[derive(Clone)]
1735pub struct BusSubCommand {
1736    bus: Arc<Bus>,
1737}
1738
1739impl BusSubCommand {
1740    pub fn new(bus: Arc<Bus>) -> Self {
1741        Self { bus }
1742    }
1743}
1744
1745impl Command for BusSubCommand {
1746    fn name(&self) -> &str {
1747        ".bus sub"
1748    }
1749
1750    fn description(&self) -> &str {
1751        "Subscribe to ephemeral in-process topics; yields {topic, value} records."
1752    }
1753
1754    fn extra_description(&self) -> &str {
1755        r#"With no argument, yields every published event. With a glob pattern (e.g. "tab-abc.*"),
1756yields only events whose topic matches. `*` matches any run of characters including dots."#
1757    }
1758
1759    fn signature(&self) -> Signature {
1760        Signature::build(".bus sub")
1761            .input_output_types(vec![(Type::Nothing, Type::Any)])
1762            .optional(
1763                "pattern",
1764                SyntaxShape::String,
1765                "glob pattern; omit for all topics",
1766            )
1767            .category(Category::Experimental)
1768    }
1769
1770    fn run(
1771        &self,
1772        engine_state: &EngineState,
1773        stack: &mut Stack,
1774        call: &Call,
1775        _input: PipelineData,
1776    ) -> Result<PipelineData, ShellError> {
1777        use nu_protocol::{record, ListStream, Signals};
1778
1779        let pattern: Option<String> = call.opt(engine_state, stack, 0)?;
1780        let span = call.head;
1781        let signals = engine_state.signals().clone();
1782
1783        let mut sub = self.bus.subscribe(pattern);
1784
1785        let (tx, rx) = std::sync::mpsc::channel::<crate::bus::BusEvent>();
1786        std::thread::spawn(move || {
1787            let rt = match tokio::runtime::Runtime::new() {
1788                Ok(rt) => rt,
1789                Err(_) => return,
1790            };
1791            rt.block_on(async move {
1792                while let Some(ev) = sub.recv().await {
1793                    if tx.send(ev).is_err() {
1794                        break;
1795                    }
1796                }
1797            });
1798        });
1799
1800        let stream = ListStream::new(
1801            std::iter::from_fn(move || {
1802                use std::sync::mpsc::RecvTimeoutError;
1803                use std::time::Duration;
1804                loop {
1805                    if signals.interrupted() {
1806                        return None;
1807                    }
1808                    match rx.recv_timeout(Duration::from_millis(100)) {
1809                        Ok(ev) => {
1810                            let rec = record! {
1811                                "topic" => Value::string(ev.topic, span),
1812                                "value" => ev.value,
1813                            };
1814                            return Some(Value::record(rec, span));
1815                        }
1816                        Err(RecvTimeoutError::Timeout) => continue,
1817                        Err(RecvTimeoutError::Disconnected) => return None,
1818                    }
1819                }
1820            }),
1821            span,
1822            Signals::empty(),
1823        );
1824
1825        Ok(PipelineData::ListStream(stream, None))
1826    }
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831    use super::event_to_string;
1832    use nu_protocol::{record, Config, Value};
1833
1834    // Regression: a `comment` field emits an SSE comment line (`: text`).
1835    #[test]
1836    fn comment_only_emits_sse_comment() {
1837        let rec = Value::test_record(record! { "comment" => Value::test_string("hb") });
1838        assert_eq!(
1839            event_to_string(&Config::default(), rec).unwrap(),
1840            ": hb\n\n"
1841        );
1842    }
1843
1844    // The comment line precedes the data line, matching field order.
1845    #[test]
1846    fn comment_precedes_data() {
1847        let rec = Value::test_record(record! {
1848            "comment" => Value::test_string("hb"),
1849            "data" => Value::test_string("x"),
1850        });
1851        assert_eq!(
1852            event_to_string(&Config::default(), rec).unwrap(),
1853            ": hb\ndata: x\n\n"
1854        );
1855    }
1856}