1use crate::bus::Bus;
2use crate::logging::log_print;
3use crate::response::{Response, ResponseBodyType};
4use nu_engine::command_prelude::*;
5use nu_protocol::{
6 shell_error::generic::GenericError, ByteStream, ByteStreamType, Category, Config, CustomValue,
7 PipelineData, PipelineMetadata, ShellError, Signature, Span, SyntaxShape, Type, Value,
8};
9use serde::{Deserialize, Serialize};
10use std::cell::RefCell;
11use std::collections::HashMap;
12use std::io::Read;
13use std::path::PathBuf;
14use tokio::sync::oneshot;
15
16use minijinja::{path_loader, AutoEscape, Environment};
17use std::sync::{Arc, OnceLock, RwLock};
18
19use syntect::html::{ClassStyle, ClassedHTMLGenerator};
20use syntect::parsing::SyntaxSet;
21use syntect::util::LinesWithEndings;
22
23type TemplateCache = RwLock<HashMap<u128, Arc<Environment<'static>>>>;
26
27static TEMPLATE_CACHE: OnceLock<TemplateCache> = OnceLock::new();
28
29fn get_cache() -> &'static TemplateCache {
30 TEMPLATE_CACHE.get_or_init(|| RwLock::new(HashMap::new()))
31}
32
33fn hash_source_and_path(source: &str, base_dir: &std::path::Path) -> u128 {
34 let mut data = source.as_bytes().to_vec();
35 data.extend_from_slice(base_dir.to_string_lossy().as_bytes());
36 xxhash_rust::xxh3::xxh3_128(&data)
37}
38
39fn compile_template(source: &str, base_dir: &std::path::Path) -> Result<u128, minijinja::Error> {
41 compile_template_with_loader(source, base_dir, path_loader(base_dir))
42}
43
44fn compile_template_with_loader<F>(
46 source: &str,
47 base_dir: &std::path::Path,
48 loader: F,
49) -> Result<u128, minijinja::Error>
50where
51 F: Fn(&str) -> Result<Option<String>, minijinja::Error> + Send + Sync + 'static,
52{
53 let hash = hash_source_and_path(source, base_dir);
54
55 let mut cache = get_cache().write().unwrap();
56 if cache.contains_key(&hash) {
57 return Ok(hash);
58 }
59
60 let mut env = Environment::new();
61 env.set_auto_escape_callback(|_| AutoEscape::Html);
62 env.set_loader(loader);
63 env.add_template_owned("template".to_string(), source.to_string())?;
64 cache.insert(hash, Arc::new(env));
65 Ok(hash)
66}
67
68fn get_compiled(hash: u128) -> Option<Arc<Environment<'static>>> {
70 get_cache().read().unwrap().get(&hash).map(Arc::clone)
71}
72
73#[cfg(feature = "cross-stream")]
75fn load_topic_content(store: &xs::store::Store, topic: &str) -> Option<String> {
76 let options = xs::store::ReadOptions::builder()
77 .follow(xs::store::FollowOption::Off)
78 .topic(topic.to_string())
79 .last(1_usize)
80 .build();
81 let frame = store.read_sync(options).last()?;
82 let hash = frame.hash?;
83 let bytes = store.cas_read_sync(&hash).ok()?;
84 String::from_utf8(bytes).ok()
85}
86
87#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct CompiledTemplate {
91 hash: u128,
92}
93
94impl CompiledTemplate {
95 pub fn render(&self, context: &minijinja::Value) -> Result<String, minijinja::Error> {
97 let env = get_compiled(self.hash).expect("template not in cache");
98 let tmpl = env.get_template("template")?;
99 tmpl.render(context)
100 }
101}
102
103#[typetag::serde]
104impl CustomValue for CompiledTemplate {
105 fn clone_value(&self, span: Span) -> Value {
106 Value::custom(Box::new(self.clone()), span)
107 }
108
109 fn type_name(&self) -> String {
110 "CompiledTemplate".into()
111 }
112
113 fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
114 Ok(Value::string(format!("{:032x}", self.hash), span))
115 }
116
117 fn as_any(&self) -> &dyn std::any::Any {
118 self
119 }
120
121 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
122 self
123 }
124}
125
126thread_local! {
127 pub static RESPONSE_TX: RefCell<Option<oneshot::Sender<Response>>> = const { RefCell::new(None) };
128}
129
130#[derive(Clone)]
131pub struct StaticCommand;
132
133impl Default for StaticCommand {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl StaticCommand {
140 pub fn new() -> Self {
141 Self
142 }
143}
144
145impl Command for StaticCommand {
146 fn name(&self) -> &str {
147 ".static"
148 }
149
150 fn description(&self) -> &str {
151 "Serve static files from a directory"
152 }
153
154 fn signature(&self) -> Signature {
155 Signature::build(".static")
156 .required("root", SyntaxShape::String, "root directory path")
157 .required("path", SyntaxShape::String, "request path")
158 .named(
159 "fallback",
160 SyntaxShape::String,
161 "fallback file when request missing",
162 None,
163 )
164 .input_output_types(vec![(Type::Nothing, Type::Nothing)])
165 .category(Category::Custom("http".into()))
166 }
167
168 fn run(
169 &self,
170 engine_state: &EngineState,
171 stack: &mut Stack,
172 call: &Call,
173 _input: PipelineData,
174 ) -> Result<PipelineData, ShellError> {
175 let root: String = call.req(engine_state, stack, 0)?;
176 let path: String = call.req(engine_state, stack, 1)?;
177
178 let fallback: Option<String> = call.get_flag(engine_state, stack, "fallback")?;
179
180 let response = Response {
181 status: 200,
182 headers: HashMap::new(),
183 body_type: ResponseBodyType::Static {
184 root: PathBuf::from(root),
185 path,
186 fallback,
187 },
188 };
189
190 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
191 if let Some(tx) = tx.borrow_mut().take() {
192 tx.send(response).map_err(|_| {
193 ShellError::Generic(GenericError::new(
194 "Failed to send response",
195 "Channel closed",
196 call.head,
197 ))
198 })?;
199 }
200 Ok(())
201 })?;
202
203 Ok(PipelineData::Empty)
204 }
205}
206
207const LINE_ENDING: &str = "\n";
208
209#[derive(Clone)]
210pub struct ToSse;
211
212impl Command for ToSse {
213 fn name(&self) -> &str {
214 "to sse"
215 }
216
217 fn signature(&self) -> Signature {
218 Signature::build("to sse")
219 .input_output_types(vec![
220 (Type::record(), Type::String),
221 (Type::List(Box::new(Type::record())), Type::String),
222 ])
223 .category(Category::Formats)
224 }
225
226 fn description(&self) -> &str {
227 "Convert records into text/event-stream format"
228 }
229
230 fn search_terms(&self) -> Vec<&str> {
231 vec!["sse", "server", "event"]
232 }
233
234 fn examples(&self) -> Vec<Example<'_>> {
235 vec![Example {
236 description: "Convert a record into a server-sent event",
237 example: "{data: 'hello'} | to sse",
238 result: Some(Value::test_string("data: hello\n\n")),
239 }]
240 }
241
242 fn run(
243 &self,
244 engine_state: &EngineState,
245 stack: &mut Stack,
246 call: &Call,
247 input: PipelineData,
248 ) -> Result<PipelineData, ShellError> {
249 let head = call.head;
250 let config = stack.get_config(engine_state);
251 match input {
252 PipelineData::ListStream(stream, meta) => {
253 let span = stream.span();
254 let cfg = config.clone();
255 let iter = stream
256 .into_iter()
257 .map(move |val| event_to_string(&cfg, val));
258 let stream = ByteStream::from_result_iter(
259 iter,
260 span,
261 engine_state.signals().clone(),
262 ByteStreamType::String,
263 );
264 Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
265 }
266 PipelineData::Value(Value::List { vals, .. }, meta) => {
267 let cfg = config.clone();
268 let iter = vals.into_iter().map(move |val| event_to_string(&cfg, val));
269 let span = head;
270 let stream = ByteStream::from_result_iter(
271 iter,
272 span,
273 engine_state.signals().clone(),
274 ByteStreamType::String,
275 );
276 Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
277 }
278 PipelineData::Value(val, meta) => {
279 let out = event_to_string(&config, val)?;
280 Ok(
281 Value::string(out, head)
282 .into_pipeline_data_with_metadata(update_metadata(meta)),
283 )
284 }
285 PipelineData::Empty => Ok(PipelineData::Value(
286 Value::string(String::new(), head),
287 update_metadata(None),
288 )),
289 PipelineData::ByteStream(..) => Err(ShellError::TypeMismatch {
290 err_message: "expected record input".into(),
291 span: head,
292 }),
293 }
294 }
295}
296
297fn emit_data_lines(out: &mut String, s: &str) {
298 for line in s.lines() {
299 out.push_str("data: ");
300 out.push_str(line);
301 out.push_str(LINE_ENDING);
302 }
303}
304
305#[allow(clippy::result_large_err)]
306fn value_to_data_string(val: &Value, config: &Config) -> Result<String, ShellError> {
307 match val {
308 Value::String { val, .. } => Ok(val.clone()),
309 _ => {
310 let json_value = value_to_json(val, config).map_err(|err| {
311 ShellError::Generic(GenericError::new(
312 err.to_string(),
313 "failed to serialize json",
314 Span::unknown(),
315 ))
316 })?;
317 serde_json::to_string(&json_value).map_err(|err| {
318 ShellError::Generic(GenericError::new(
319 err.to_string(),
320 "failed to serialize json",
321 Span::unknown(),
322 ))
323 })
324 }
325 }
326}
327
328#[allow(clippy::result_large_err)]
329fn event_to_string(config: &Config, val: Value) -> Result<String, ShellError> {
330 let span = val.span();
331 let rec = match val {
332 Value::Record { val, .. } => val,
333 Value::Error { error, .. } => return Err(*error),
335 other => {
336 return Err(ShellError::TypeMismatch {
337 err_message: format!("expected record, got {}", other.get_type()),
338 span,
339 })
340 }
341 };
342 let mut out = String::new();
343 if let Some(comment) = rec.get("comment") {
346 if !matches!(comment, Value::Nothing { .. }) {
347 out.push_str(": ");
348 out.push_str(&comment.to_expanded_string("", config));
349 out.push_str(LINE_ENDING);
350 }
351 }
352 if let Some(event) = rec.get("event") {
353 if !matches!(event, Value::Nothing { .. }) {
354 out.push_str("event: ");
355 out.push_str(&event.to_expanded_string("", config));
356 out.push_str(LINE_ENDING);
357 }
358 }
359 if let Some(id) = rec.get("id") {
360 if !matches!(id, Value::Nothing { .. }) {
361 out.push_str("id: ");
362 out.push_str(&id.to_expanded_string("", config));
363 out.push_str(LINE_ENDING);
364 }
365 }
366 if let Some(retry) = rec.get("retry") {
367 if !matches!(retry, Value::Nothing { .. }) {
368 out.push_str("retry: ");
369 out.push_str(&retry.to_expanded_string("", config));
370 out.push_str(LINE_ENDING);
371 }
372 }
373 if let Some(data) = rec.get("data") {
374 if !matches!(data, Value::Nothing { .. }) {
375 match data {
376 Value::List { vals, .. } => {
377 for item in vals {
378 emit_data_lines(&mut out, &value_to_data_string(item, config)?);
379 }
380 }
381 _ => {
382 emit_data_lines(&mut out, &value_to_data_string(data, config)?);
383 }
384 }
385 }
386 }
387 out.push_str(LINE_ENDING);
388 Ok(out)
389}
390
391fn value_to_json(val: &Value, config: &Config) -> serde_json::Result<serde_json::Value> {
392 Ok(match val {
393 Value::Bool { val, .. } => serde_json::Value::Bool(*val),
394 Value::Int { val, .. } => serde_json::Value::from(*val),
395 Value::Float { val, .. } => serde_json::Number::from_f64(*val)
396 .map(serde_json::Value::Number)
397 .unwrap_or(serde_json::Value::Null),
398 Value::String { val, .. } => serde_json::Value::String(val.clone()),
399 Value::List { vals, .. } => serde_json::Value::Array(
400 vals.iter()
401 .map(|v| value_to_json(v, config))
402 .collect::<Result<Vec<_>, _>>()?,
403 ),
404 Value::Record { val, .. } => {
405 let mut map = serde_json::Map::new();
406 for (k, v) in val.iter() {
407 map.insert(k.clone(), value_to_json(v, config)?);
408 }
409 serde_json::Value::Object(map)
410 }
411 Value::Nothing { .. } => serde_json::Value::Null,
412 other => serde_json::Value::String(other.to_expanded_string("", config)),
413 })
414}
415
416fn update_metadata(metadata: Option<PipelineMetadata>) -> Option<PipelineMetadata> {
417 metadata
418 .map(|md| md.with_content_type(Some("text/event-stream".into())))
419 .or_else(|| {
420 Some(PipelineMetadata::default().with_content_type(Some("text/event-stream".into())))
421 })
422}
423
424#[derive(Clone)]
425pub struct ReverseProxyCommand;
426
427impl Default for ReverseProxyCommand {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433impl ReverseProxyCommand {
434 pub fn new() -> Self {
435 Self
436 }
437}
438
439impl Command for ReverseProxyCommand {
440 fn name(&self) -> &str {
441 ".reverse-proxy"
442 }
443
444 fn description(&self) -> &str {
445 "Forward HTTP requests to a backend server"
446 }
447
448 fn signature(&self) -> Signature {
449 Signature::build(".reverse-proxy")
450 .required("target_url", SyntaxShape::String, "backend URL to proxy to")
451 .optional(
452 "config",
453 SyntaxShape::Record(vec![]),
454 "optional configuration (headers, preserve_host, strip_prefix, query)",
455 )
456 .input_output_types(vec![(Type::Any, Type::Nothing)])
457 .category(Category::Custom("http".into()))
458 }
459
460 fn run(
461 &self,
462 engine_state: &EngineState,
463 stack: &mut Stack,
464 call: &Call,
465 input: PipelineData,
466 ) -> Result<PipelineData, ShellError> {
467 let target_url: String = call.req(engine_state, stack, 0)?;
468
469 let request_body = match input {
471 PipelineData::Empty => Vec::new(),
472 PipelineData::Value(value, _) => crate::response::value_to_bytes(value),
473 PipelineData::ByteStream(stream, _) => {
474 let mut body_bytes = Vec::new();
476 if let Some(mut reader) = stream.reader() {
477 loop {
478 let mut buffer = vec![0; 8192];
479 match reader.read(&mut buffer) {
480 Ok(0) => break, Ok(n) => {
482 buffer.truncate(n);
483 body_bytes.extend_from_slice(&buffer);
484 }
485 Err(_) => break,
486 }
487 }
488 }
489 body_bytes
490 }
491 PipelineData::ListStream(stream, _) => {
492 let items: Vec<_> = stream.into_iter().collect();
494 let json_value = serde_json::Value::Array(
495 items
496 .into_iter()
497 .map(|v| crate::response::value_to_json(&v))
498 .collect(),
499 );
500 serde_json::to_string(&json_value)
501 .unwrap_or_default()
502 .into_bytes()
503 }
504 };
505
506 let config = call.opt::<Value>(engine_state, stack, 1);
508
509 let mut headers = HashMap::new();
510 let mut preserve_host = true;
511 let mut strip_prefix: Option<String> = None;
512 let mut query: Option<HashMap<String, String>> = None;
513
514 if let Ok(Some(config_value)) = config {
515 if let Ok(record) = config_value.as_record() {
516 if let Some(headers_value) = record.get("headers") {
518 if let Ok(headers_record) = headers_value.as_record() {
519 for (k, v) in headers_record.iter() {
520 let header_value = match v {
521 Value::String { val, .. } => {
522 crate::response::HeaderValue::Single(val.clone())
523 }
524 Value::List { vals, .. } => {
525 let strings: Vec<String> = vals
526 .iter()
527 .filter_map(|v| v.as_str().ok())
528 .map(|s| s.to_string())
529 .collect();
530 crate::response::HeaderValue::Multiple(strings)
531 }
532 _ => continue, };
534 headers.insert(k.clone(), header_value);
535 }
536 }
537 }
538
539 if let Some(preserve_host_value) = record.get("preserve_host") {
541 if let Ok(ph) = preserve_host_value.as_bool() {
542 preserve_host = ph;
543 }
544 }
545
546 if let Some(strip_prefix_value) = record.get("strip_prefix") {
548 if let Ok(prefix) = strip_prefix_value.as_str() {
549 strip_prefix = Some(prefix.to_string());
550 }
551 }
552
553 if let Some(query_value) = record.get("query") {
555 if let Ok(query_record) = query_value.as_record() {
556 let mut query_map = HashMap::new();
557 for (k, v) in query_record.iter() {
558 if let Ok(v_str) = v.as_str() {
559 query_map.insert(k.clone(), v_str.to_string());
560 }
561 }
562 query = Some(query_map);
563 }
564 }
565 }
566 }
567
568 let response = Response {
569 status: 200,
570 headers: HashMap::new(),
571 body_type: ResponseBodyType::ReverseProxy {
572 target_url,
573 headers,
574 preserve_host,
575 strip_prefix,
576 request_body,
577 query,
578 },
579 };
580
581 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
582 if let Some(tx) = tx.borrow_mut().take() {
583 tx.send(response).map_err(|_| {
584 ShellError::Generic(GenericError::new(
585 "Failed to send response",
586 "Channel closed",
587 call.head,
588 ))
589 })?;
590 }
591 Ok(())
592 })?;
593
594 Ok(PipelineData::Empty)
595 }
596}
597
598#[derive(Clone)]
599pub struct MjCommand {
600 #[cfg(feature = "cross-stream")]
601 store: Option<xs::store::Store>,
602}
603
604impl Default for MjCommand {
605 fn default() -> Self {
606 Self::new()
607 }
608}
609
610impl MjCommand {
611 pub fn new() -> Self {
612 Self {
613 #[cfg(feature = "cross-stream")]
614 store: None,
615 }
616 }
617
618 #[cfg(feature = "cross-stream")]
619 pub fn with_store(store: xs::store::Store) -> Self {
620 Self { store: Some(store) }
621 }
622}
623
624impl Command for MjCommand {
625 fn name(&self) -> &str {
626 ".mj"
627 }
628
629 fn description(&self) -> &str {
630 "Render a minijinja template with context from input"
631 }
632
633 fn signature(&self) -> Signature {
634 Signature::build(".mj")
635 .optional("file", SyntaxShape::String, "template file path")
636 .named(
637 "inline",
638 SyntaxShape::String,
639 "inline template string",
640 Some('i'),
641 )
642 .named(
643 "topic",
644 SyntaxShape::String,
645 "load template from a store topic",
646 Some('t'),
647 )
648 .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
649 .category(Category::Custom("http".into()))
650 }
651
652 fn run(
653 &self,
654 engine_state: &EngineState,
655 stack: &mut Stack,
656 call: &Call,
657 input: PipelineData,
658 ) -> Result<PipelineData, ShellError> {
659 let head = call.head;
660 let file: Option<String> = call.opt(engine_state, stack, 0)?;
661 let inline: Option<String> = call.get_flag(engine_state, stack, "inline")?;
662 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
663
664 let mode_count = file.is_some() as u8 + inline.is_some() as u8 + topic.is_some() as u8;
665 if mode_count > 1 {
666 return Err(ShellError::Generic(GenericError::new(
667 "Cannot combine file, --inline, and --topic",
668 "use exactly one of: file path, --inline, or --topic",
669 head,
670 )));
671 }
672 if mode_count == 0 {
673 return Err(ShellError::Generic(GenericError::new(
674 "No template specified",
675 "provide a file path, --inline, or --topic",
676 head,
677 )));
678 }
679
680 let context = match input {
682 PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
683 PipelineData::Empty => minijinja::Value::from(()),
684 _ => {
685 return Err(ShellError::TypeMismatch {
686 err_message: "expected record input".into(),
687 span: head,
688 });
689 }
690 };
691
692 let mut env = Environment::new();
694 env.set_auto_escape_callback(|_| AutoEscape::Html);
695 let tmpl = if let Some(ref path) = file {
696 let path = std::path::Path::new(path);
698 let abs_path = if path.is_absolute() {
699 path.to_path_buf()
700 } else {
701 std::env::current_dir().unwrap_or_default().join(path)
702 };
703 if let Some(parent) = abs_path.parent() {
704 env.set_loader(path_loader(parent));
705 }
706 let name = abs_path
707 .file_name()
708 .and_then(|n| n.to_str())
709 .unwrap_or("template");
710 env.get_template(name).map_err(|e| {
711 ShellError::Generic(GenericError::new(
712 format!("Template error: {e}"),
713 e.to_string(),
714 head,
715 ))
716 })?
717 } else if let Some(ref topic_name) = topic {
718 #[cfg(feature = "cross-stream")]
720 {
721 let store = self.store.as_ref().ok_or_else(|| {
722 ShellError::Generic(GenericError::new(
723 "--topic requires --store",
724 "server must be started with --store to use --topic",
725 head,
726 ))
727 })?;
728 let source = load_topic_content(store, topic_name).ok_or_else(|| {
729 ShellError::Generic(GenericError::new(
730 format!("Topic not found: {topic_name}"),
731 "no content in store for this topic",
732 head,
733 ))
734 })?;
735 let topic_store = store.clone();
736 env.set_loader(move |name: &str| Ok(load_topic_content(&topic_store, name)));
737 env.add_template_owned(topic_name.clone(), source)
738 .map_err(|e| {
739 ShellError::Generic(GenericError::new(
740 format!("Template parse error: {e}"),
741 e.to_string(),
742 head,
743 ))
744 })?;
745 env.get_template(topic_name).map_err(|e| {
746 ShellError::Generic(GenericError::new(
747 format!("Template error: {e}"),
748 e.to_string(),
749 head,
750 ))
751 })?
752 }
753 #[cfg(not(feature = "cross-stream"))]
754 {
755 let _ = topic_name;
756 return Err(ShellError::Generic(GenericError::new(
757 "--topic requires cross-stream feature",
758 "built without store support",
759 head,
760 )));
761 }
762 } else {
763 let source = inline.unwrap();
765 env.add_template_owned("template".to_string(), source)
766 .map_err(|e| {
767 ShellError::Generic(GenericError::new(
768 format!("Template parse error: {e}"),
769 e.to_string(),
770 head,
771 ))
772 })?;
773 env.get_template("template").map_err(|e| {
774 ShellError::Generic(GenericError::new(
775 format!("Failed to get template: {e}"),
776 e.to_string(),
777 head,
778 ))
779 })?
780 };
781
782 let rendered = tmpl.render(&context).map_err(|e| {
783 ShellError::Generic(GenericError::new(
784 format!("Template render error: {e}"),
785 e.to_string(),
786 head,
787 ))
788 })?;
789
790 Ok(Value::string(rendered, head).into_pipeline_data())
791 }
792}
793
794fn nu_value_to_minijinja(val: &Value) -> minijinja::Value {
796 let json = value_to_json(val, &Config::default()).unwrap_or(serde_json::Value::Null);
797 minijinja::Value::from_serialize(&json)
798}
799
800#[derive(Clone)]
803pub struct MjCompileCommand {
804 #[cfg(feature = "cross-stream")]
805 store: Option<xs::store::Store>,
806}
807
808impl Default for MjCompileCommand {
809 fn default() -> Self {
810 Self::new()
811 }
812}
813
814impl MjCompileCommand {
815 pub fn new() -> Self {
816 Self {
817 #[cfg(feature = "cross-stream")]
818 store: None,
819 }
820 }
821
822 #[cfg(feature = "cross-stream")]
823 pub fn with_store(store: xs::store::Store) -> Self {
824 Self { store: Some(store) }
825 }
826}
827
828impl Command for MjCompileCommand {
829 fn name(&self) -> &str {
830 ".mj compile"
831 }
832
833 fn description(&self) -> &str {
834 "Compile a minijinja template, returning a reusable compiled template"
835 }
836
837 fn signature(&self) -> Signature {
838 Signature::build(".mj compile")
839 .optional("file", SyntaxShape::String, "template file path")
840 .named(
841 "inline",
842 SyntaxShape::Any,
843 "inline template (string or {__html: string})",
844 Some('i'),
845 )
846 .named(
847 "topic",
848 SyntaxShape::String,
849 "load template from a store topic",
850 Some('t'),
851 )
852 .input_output_types(vec![(
853 Type::Nothing,
854 Type::Custom("CompiledTemplate".into()),
855 )])
856 .category(Category::Custom("http".into()))
857 }
858
859 fn run(
860 &self,
861 engine_state: &EngineState,
862 stack: &mut Stack,
863 call: &Call,
864 _input: PipelineData,
865 ) -> Result<PipelineData, ShellError> {
866 let head = call.head;
867 let file: Option<String> = call.opt(engine_state, stack, 0)?;
868 let inline: Option<Value> = call.get_flag(engine_state, stack, "inline")?;
869 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
870
871 let inline_str: Option<String> = match &inline {
873 None => None,
874 Some(val) => match val {
875 Value::String { val, .. } => Some(val.clone()),
876 Value::Record { val, .. } => {
877 if let Some(html_val) = val.get("__html") {
878 match html_val {
879 Value::String { val, .. } => Some(val.clone()),
880 _ => {
881 return Err(ShellError::Generic(GenericError::new(
882 "__html must be a string",
883 "expected string value",
884 head,
885 )));
886 }
887 }
888 } else {
889 return Err(ShellError::Generic(GenericError::new(
890 "Record must have __html field",
891 "expected {__html: string}",
892 head,
893 )));
894 }
895 }
896 _ => {
897 return Err(ShellError::Generic(GenericError::new(
898 "--inline must be string or {__html: string}",
899 "invalid type",
900 head,
901 )));
902 }
903 },
904 };
905
906 let mode_count = file.is_some() as u8 + inline_str.is_some() as u8 + topic.is_some() as u8;
907 if mode_count > 1 {
908 return Err(ShellError::Generic(GenericError::new(
909 "Cannot combine file, --inline, and --topic",
910 "use exactly one of: file path, --inline, or --topic",
911 head,
912 )));
913 }
914 if mode_count == 0 {
915 return Err(ShellError::Generic(GenericError::new(
916 "No template specified",
917 "provide a file path, --inline, or --topic",
918 head,
919 )));
920 }
921
922 let hash = if let Some(ref path) = file {
923 let path = std::path::Path::new(path);
925 let abs_path = if path.is_absolute() {
926 path.to_path_buf()
927 } else {
928 std::env::current_dir().unwrap_or_default().join(path)
929 };
930 let base_dir = abs_path.parent().unwrap_or(&abs_path).to_path_buf();
931 let source = std::fs::read_to_string(&abs_path).map_err(|e| {
932 ShellError::Generic(GenericError::new(
933 format!("Failed to read template file: {e}"),
934 "could not read file",
935 head,
936 ))
937 })?;
938 compile_template(&source, &base_dir)
939 } else if let Some(ref topic_name) = topic {
940 #[cfg(feature = "cross-stream")]
942 {
943 let store = self.store.as_ref().ok_or_else(|| {
944 ShellError::Generic(GenericError::new(
945 "--topic requires --store",
946 "server must be started with --store to use --topic",
947 head,
948 ))
949 })?;
950 let source = load_topic_content(store, topic_name).ok_or_else(|| {
951 ShellError::Generic(GenericError::new(
952 format!("Topic not found: {topic_name}"),
953 "no content in store for this topic",
954 head,
955 ))
956 })?;
957 let topic_store = store.clone();
958 let base_dir = std::path::PathBuf::from(format!("__topic__/{topic_name}"));
960 compile_template_with_loader(&source, &base_dir, move |name: &str| {
961 Ok(load_topic_content(&topic_store, name))
962 })
963 }
964 #[cfg(not(feature = "cross-stream"))]
965 {
966 let _ = topic_name;
967 return Err(ShellError::Generic(GenericError::new(
968 "--topic requires cross-stream feature",
969 "built without store support",
970 head,
971 )));
972 }
973 } else {
974 let source = inline_str.unwrap();
976 let base_dir = std::path::PathBuf::from("__inline__");
977 compile_template_with_loader(&source, &base_dir, |_| Ok(None))
978 };
979
980 let hash = hash.map_err(|e| {
981 ShellError::Generic(GenericError::new(
982 format!("Template compile error: {e}"),
983 e.to_string(),
984 head,
985 ))
986 })?;
987
988 Ok(Value::custom(Box::new(CompiledTemplate { hash }), head).into_pipeline_data())
989 }
990}
991
992#[derive(Clone)]
995pub struct MjRenderCommand;
996
997impl Default for MjRenderCommand {
998 fn default() -> Self {
999 Self::new()
1000 }
1001}
1002
1003impl MjRenderCommand {
1004 pub fn new() -> Self {
1005 Self
1006 }
1007}
1008
1009impl Command for MjRenderCommand {
1010 fn name(&self) -> &str {
1011 ".mj render"
1012 }
1013
1014 fn description(&self) -> &str {
1015 "Render a compiled minijinja template with context from input"
1016 }
1017
1018 fn signature(&self) -> Signature {
1019 Signature::build(".mj render")
1020 .required(
1021 "template",
1022 SyntaxShape::Any,
1023 "compiled template from '.mj compile'",
1024 )
1025 .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
1026 .category(Category::Custom("http".into()))
1027 }
1028
1029 fn run(
1030 &self,
1031 engine_state: &EngineState,
1032 stack: &mut Stack,
1033 call: &Call,
1034 input: PipelineData,
1035 ) -> Result<PipelineData, ShellError> {
1036 let head = call.head;
1037 let template_val: Value = call.req(engine_state, stack, 0)?;
1038
1039 let compiled = match template_val {
1041 Value::Custom { val, .. } => val
1042 .as_any()
1043 .downcast_ref::<CompiledTemplate>()
1044 .ok_or_else(|| ShellError::TypeMismatch {
1045 err_message: "expected CompiledTemplate".into(),
1046 span: head,
1047 })?
1048 .clone(),
1049 _ => {
1050 return Err(ShellError::TypeMismatch {
1051 err_message: "expected CompiledTemplate from '.mj compile'".into(),
1052 span: head,
1053 });
1054 }
1055 };
1056
1057 let context = match input {
1059 PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
1060 PipelineData::Empty => minijinja::Value::from(()),
1061 _ => {
1062 return Err(ShellError::TypeMismatch {
1063 err_message: "expected record input".into(),
1064 span: head,
1065 });
1066 }
1067 };
1068
1069 let rendered = compiled.render(&context).map_err(|e| {
1071 ShellError::Generic(GenericError::new(
1072 format!("Template render error: {e}"),
1073 e.to_string(),
1074 head,
1075 ))
1076 })?;
1077
1078 Ok(Value::string(rendered, head).into_pipeline_data())
1079 }
1080}
1081
1082struct SyntaxHighlighter {
1085 syntax_set: SyntaxSet,
1086}
1087
1088impl SyntaxHighlighter {
1089 fn new() -> Self {
1090 const SYNTAX_SET: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/syntax_set.bin"));
1091 let syntax_set = syntect::dumps::from_binary(SYNTAX_SET);
1092 Self { syntax_set }
1093 }
1094
1095 fn highlight(&self, code: &str, lang: Option<&str>) -> String {
1096 let syntax = match lang {
1097 Some(lang) => self
1098 .syntax_set
1099 .find_syntax_by_token(lang)
1100 .or_else(|| self.syntax_set.find_syntax_by_extension(lang)),
1101 None => None,
1102 }
1103 .unwrap_or_else(|| self.syntax_set.find_syntax_plain_text());
1104
1105 let mut html_generator = ClassedHTMLGenerator::new_with_class_style(
1106 syntax,
1107 &self.syntax_set,
1108 ClassStyle::Spaced,
1109 );
1110
1111 for line in LinesWithEndings::from(code) {
1112 let _ = html_generator.parse_html_for_line_which_includes_newline(line);
1113 }
1114
1115 html_generator.finalize()
1116 }
1117
1118 fn list_syntaxes(&self) -> Vec<(String, Vec<String>)> {
1119 self.syntax_set
1120 .syntaxes()
1121 .iter()
1122 .map(|s| (s.name.clone(), s.file_extensions.clone()))
1123 .collect()
1124 }
1125}
1126
1127static HIGHLIGHTER: OnceLock<SyntaxHighlighter> = OnceLock::new();
1128
1129fn get_highlighter() -> &'static SyntaxHighlighter {
1130 HIGHLIGHTER.get_or_init(SyntaxHighlighter::new)
1131}
1132
1133#[derive(Clone)]
1136pub struct HighlightCommand;
1137
1138impl Default for HighlightCommand {
1139 fn default() -> Self {
1140 Self::new()
1141 }
1142}
1143
1144impl HighlightCommand {
1145 pub fn new() -> Self {
1146 Self
1147 }
1148}
1149
1150impl Command for HighlightCommand {
1151 fn name(&self) -> &str {
1152 ".highlight"
1153 }
1154
1155 fn description(&self) -> &str {
1156 "Syntax highlight code, outputting HTML with CSS classes"
1157 }
1158
1159 fn signature(&self) -> Signature {
1160 Signature::build(".highlight")
1161 .required("lang", SyntaxShape::String, "language for highlighting")
1162 .input_output_types(vec![(Type::String, Type::record())])
1163 .category(Category::Custom("http".into()))
1164 }
1165
1166 fn run(
1167 &self,
1168 engine_state: &EngineState,
1169 stack: &mut Stack,
1170 call: &Call,
1171 input: PipelineData,
1172 ) -> Result<PipelineData, ShellError> {
1173 let head = call.head;
1174 let lang: String = call.req(engine_state, stack, 0)?;
1175
1176 let code = match input {
1177 PipelineData::Value(Value::String { val, .. }, _) => val,
1178 PipelineData::ByteStream(stream, _) => stream.into_string()?,
1179 _ => {
1180 return Err(ShellError::TypeMismatch {
1181 err_message: "expected string input".into(),
1182 span: head,
1183 });
1184 }
1185 };
1186
1187 let highlighter = get_highlighter();
1188 let html = highlighter.highlight(&code, Some(&lang));
1189
1190 Ok(Value::record(
1191 nu_protocol::record! {
1192 "__html" => Value::string(html, head),
1193 },
1194 head,
1195 )
1196 .into_pipeline_data())
1197 }
1198}
1199
1200#[derive(Clone)]
1203pub struct HighlightThemeCommand;
1204
1205impl Default for HighlightThemeCommand {
1206 fn default() -> Self {
1207 Self::new()
1208 }
1209}
1210
1211impl HighlightThemeCommand {
1212 pub fn new() -> Self {
1213 Self
1214 }
1215}
1216
1217impl Command for HighlightThemeCommand {
1218 fn name(&self) -> &str {
1219 ".highlight theme"
1220 }
1221
1222 fn description(&self) -> &str {
1223 "List available themes or get CSS for a specific theme"
1224 }
1225
1226 fn signature(&self) -> Signature {
1227 Signature::build(".highlight theme")
1228 .optional("name", SyntaxShape::String, "theme name (omit to list all)")
1229 .input_output_types(vec![
1230 (Type::Nothing, Type::List(Box::new(Type::String))),
1231 (Type::Nothing, Type::String),
1232 ])
1233 .category(Category::Custom("http".into()))
1234 }
1235
1236 fn run(
1237 &self,
1238 engine_state: &EngineState,
1239 stack: &mut Stack,
1240 call: &Call,
1241 _input: PipelineData,
1242 ) -> Result<PipelineData, ShellError> {
1243 let head = call.head;
1244 let name: Option<String> = call.opt(engine_state, stack, 0)?;
1245
1246 let assets = syntect_assets::assets::HighlightingAssets::from_binary();
1247
1248 match name {
1249 None => {
1250 let themes: Vec<Value> = assets.themes().map(|t| Value::string(t, head)).collect();
1251 Ok(Value::list(themes, head).into_pipeline_data())
1252 }
1253 Some(theme_name) => {
1254 let theme = assets.get_theme(&theme_name);
1255 let css = syntect::html::css_for_theme_with_class_style(theme, ClassStyle::Spaced)
1256 .map_err(|e| {
1257 ShellError::Generic(GenericError::new(
1258 format!("Failed to generate CSS: {e}"),
1259 e.to_string(),
1260 head,
1261 ))
1262 })?;
1263 Ok(Value::string(css, head).into_pipeline_data())
1264 }
1265 }
1266 }
1267}
1268
1269#[derive(Clone)]
1272pub struct HighlightLangCommand;
1273
1274impl Default for HighlightLangCommand {
1275 fn default() -> Self {
1276 Self::new()
1277 }
1278}
1279
1280impl HighlightLangCommand {
1281 pub fn new() -> Self {
1282 Self
1283 }
1284}
1285
1286impl Command for HighlightLangCommand {
1287 fn name(&self) -> &str {
1288 ".highlight lang"
1289 }
1290
1291 fn description(&self) -> &str {
1292 "List available languages for syntax highlighting"
1293 }
1294
1295 fn signature(&self) -> Signature {
1296 Signature::build(".highlight lang")
1297 .input_output_types(vec![(Type::Nothing, Type::List(Box::new(Type::record())))])
1298 .category(Category::Custom("http".into()))
1299 }
1300
1301 fn run(
1302 &self,
1303 _engine_state: &EngineState,
1304 _stack: &mut Stack,
1305 call: &Call,
1306 _input: PipelineData,
1307 ) -> Result<PipelineData, ShellError> {
1308 let head = call.head;
1309 let highlighter = get_highlighter();
1310 let langs: Vec<Value> = highlighter
1311 .list_syntaxes()
1312 .into_iter()
1313 .map(|(name, exts)| {
1314 Value::record(
1315 nu_protocol::record! {
1316 "name" => Value::string(name, head),
1317 "extensions" => Value::list(
1318 exts.into_iter().map(|e| Value::string(e, head)).collect(),
1319 head
1320 ),
1321 },
1322 head,
1323 )
1324 })
1325 .collect();
1326 Ok(Value::list(langs, head).into_pipeline_data())
1327 }
1328}
1329
1330use pulldown_cmark::{html, CodeBlockKind, Event, Parser as MarkdownParser, Tag, TagEnd};
1333
1334#[derive(Clone)]
1335pub struct MdCommand;
1336
1337impl Default for MdCommand {
1338 fn default() -> Self {
1339 Self::new()
1340 }
1341}
1342
1343impl MdCommand {
1344 pub fn new() -> Self {
1345 Self
1346 }
1347}
1348
1349impl Command for MdCommand {
1350 fn name(&self) -> &str {
1351 ".md"
1352 }
1353
1354 fn description(&self) -> &str {
1355 "Convert Markdown to HTML with syntax-highlighted code blocks"
1356 }
1357
1358 fn signature(&self) -> Signature {
1359 Signature::build(".md")
1360 .input_output_types(vec![
1361 (Type::String, Type::record()),
1362 (Type::record(), Type::record()),
1363 ])
1364 .category(Category::Custom("http".into()))
1365 }
1366
1367 fn run(
1368 &self,
1369 _engine_state: &EngineState,
1370 _stack: &mut Stack,
1371 call: &Call,
1372 input: PipelineData,
1373 ) -> Result<PipelineData, ShellError> {
1374 let head = call.head;
1375
1376 let (markdown, trusted) = match input.into_value(head)? {
1378 Value::String { val, .. } => (val, false),
1379 Value::Record { val, .. } => {
1380 if let Some(html_val) = val.get("__html") {
1381 (html_val.as_str()?.to_string(), true)
1382 } else {
1383 return Err(ShellError::TypeMismatch {
1384 err_message: "expected string or {__html: ...}".into(),
1385 span: head,
1386 });
1387 }
1388 }
1389 other => {
1390 return Err(ShellError::TypeMismatch {
1391 err_message: format!(
1392 "expected string or {{__html: ...}}, got {}",
1393 other.get_type()
1394 ),
1395 span: head,
1396 });
1397 }
1398 };
1399
1400 let highlighter = get_highlighter();
1401
1402 let mut in_code_block = false;
1403 let mut current_code = String::new();
1404 let mut current_lang: Option<String> = None;
1405
1406 let mut options = pulldown_cmark::Options::empty();
1407 options.insert(pulldown_cmark::Options::ENABLE_TABLES);
1408 options.insert(pulldown_cmark::Options::ENABLE_STRIKETHROUGH);
1409 options.insert(pulldown_cmark::Options::ENABLE_TASKLISTS);
1410 options.insert(pulldown_cmark::Options::ENABLE_FOOTNOTES);
1411 options.insert(pulldown_cmark::Options::ENABLE_HEADING_ATTRIBUTES);
1412 options.insert(pulldown_cmark::Options::ENABLE_GFM);
1413 options.insert(pulldown_cmark::Options::ENABLE_DEFINITION_LIST);
1414 let parser = MarkdownParser::new_ext(&markdown, options).map(|event| match event {
1415 Event::Start(Tag::CodeBlock(kind)) => {
1416 in_code_block = true;
1417 current_code.clear();
1418 current_lang = match kind {
1419 CodeBlockKind::Fenced(info) => {
1420 let lang = info.split_whitespace().next().unwrap_or("");
1421 if lang.is_empty() {
1422 None
1423 } else {
1424 Some(lang.to_string())
1425 }
1426 }
1427 CodeBlockKind::Indented => None,
1428 };
1429 Event::Text("".into())
1430 }
1431 Event::End(TagEnd::CodeBlock) => {
1432 in_code_block = false;
1433 let highlighted = highlighter.highlight(¤t_code, current_lang.as_deref());
1434 let mut html_out = String::new();
1435 html_out.push_str("<pre><code");
1436 if let Some(lang) = ¤t_lang {
1437 let lang = v_htmlescape::escape(lang);
1438 html_out.push_str(&format!(" class=\"language-{lang}\""));
1439 }
1440 html_out.push('>');
1441 html_out.push_str(&highlighted);
1442 html_out.push_str("</code></pre>");
1443 Event::Html(html_out.into())
1444 }
1445 Event::Text(text) => {
1446 if in_code_block {
1447 current_code.push_str(&text);
1448 Event::Text("".into())
1449 } else {
1450 Event::Text(text)
1451 }
1452 }
1453 Event::Html(html) => {
1455 if trusted {
1456 Event::Html(html)
1457 } else {
1458 Event::Text(html) }
1460 }
1461 Event::InlineHtml(html) => {
1462 if trusted {
1463 Event::InlineHtml(html)
1464 } else {
1465 Event::Text(html)
1466 }
1467 }
1468 e => e,
1469 });
1470
1471 let mut html_output = String::new();
1472 html::push_html(&mut html_output, parser);
1473
1474 Ok(Value::record(
1475 nu_protocol::record! {
1476 "__html" => Value::string(html_output, head),
1477 },
1478 head,
1479 )
1480 .into_pipeline_data())
1481 }
1482}
1483
1484#[derive(Clone)]
1487pub struct PrintCommand;
1488
1489impl Default for PrintCommand {
1490 fn default() -> Self {
1491 Self::new()
1492 }
1493}
1494
1495impl PrintCommand {
1496 pub fn new() -> Self {
1497 Self
1498 }
1499}
1500
1501impl Command for PrintCommand {
1502 fn name(&self) -> &str {
1503 "print"
1504 }
1505
1506 fn description(&self) -> &str {
1507 "Print the given values to the http-nu logging system."
1508 }
1509
1510 fn extra_description(&self) -> &str {
1511 r#"This command outputs to http-nu's logging system rather than stdout/stderr.
1512Messages appear in both human-readable and JSONL output modes.
1513
1514`print` may be used inside blocks of code (e.g.: hooks) to display text during execution without interfering with the pipeline."#
1515 }
1516
1517 fn search_terms(&self) -> Vec<&str> {
1518 vec!["display"]
1519 }
1520
1521 fn signature(&self) -> Signature {
1522 Signature::build("print")
1523 .input_output_types(vec![
1524 (Type::Nothing, Type::Nothing),
1525 (Type::Any, Type::Nothing),
1526 ])
1527 .allow_variants_without_examples(true)
1528 .rest("rest", SyntaxShape::Any, "the values to print")
1529 .switch(
1530 "no-newline",
1531 "print without inserting a newline for the line ending",
1532 Some('n'),
1533 )
1534 .switch("stderr", "print to stderr instead of stdout", Some('e'))
1535 .switch(
1536 "raw",
1537 "print without formatting (including binary data)",
1538 Some('r'),
1539 )
1540 .category(Category::Strings)
1541 }
1542
1543 fn run(
1544 &self,
1545 engine_state: &EngineState,
1546 stack: &mut Stack,
1547 call: &Call,
1548 input: PipelineData,
1549 ) -> Result<PipelineData, ShellError> {
1550 let args: Vec<Value> = call.rest(engine_state, stack, 0)?;
1551 let no_newline = call.has_flag(engine_state, stack, "no-newline")?;
1552 let config = stack.get_config(engine_state);
1553
1554 let format_value = |val: &Value| -> String { val.to_expanded_string(" ", &config) };
1555
1556 if !args.is_empty() {
1557 let messages: Vec<String> = args.iter().map(format_value).collect();
1558 let message = if no_newline {
1559 messages.join("")
1560 } else {
1561 messages.join("\n")
1562 };
1563 log_print(&message);
1564 } else if !input.is_nothing() {
1565 let message = match input {
1566 PipelineData::Value(val, _) => format_value(&val),
1567 PipelineData::ListStream(stream, _) => {
1568 let vals: Vec<String> = stream.into_iter().map(|v| format_value(&v)).collect();
1569 vals.join("\n")
1570 }
1571 PipelineData::ByteStream(stream, _) => stream.into_string()?,
1572 PipelineData::Empty => String::new(),
1573 };
1574 if !message.is_empty() {
1575 log_print(&message);
1576 }
1577 }
1578
1579 Ok(PipelineData::empty())
1580 }
1581
1582 fn examples(&self) -> Vec<Example<'_>> {
1583 vec![
1584 Example {
1585 description: "Print 'hello world'",
1586 example: r#"print "hello world""#,
1587 result: None,
1588 },
1589 Example {
1590 description: "Print the sum of 2 and 3",
1591 example: r#"print (2 + 3)"#,
1592 result: None,
1593 },
1594 ]
1595 }
1596}
1597
1598#[derive(Clone)]
1601pub struct RunNuCommand;
1602
1603impl Default for RunNuCommand {
1604 fn default() -> Self {
1605 Self::new()
1606 }
1607}
1608
1609impl RunNuCommand {
1610 pub fn new() -> Self {
1611 Self
1612 }
1613}
1614
1615impl Command for RunNuCommand {
1616 fn name(&self) -> &str {
1617 ".run"
1618 }
1619
1620 fn description(&self) -> &str {
1621 "Parse, compile, and evaluate a nushell pipeline string in a sandboxed engine state."
1622 }
1623
1624 fn extra_description(&self) -> &str {
1625 r#"The submitted script is parsed and compiled against a clone of the current engine state.
1626Any defs, lets, or modules introduced by the script live only for the duration of the call --
1627they do not leak back into the calling engine. Pipeline input is forwarded to the script as `$in`;
1628the caller's local bindings and environment are not visible inside the sandbox."#
1629 }
1630
1631 fn signature(&self) -> Signature {
1632 Signature::build(".run")
1633 .input_output_types(vec![(Type::Any, Type::Any)])
1634 .required("script", SyntaxShape::String, "nushell source to evaluate")
1635 .category(Category::Experimental)
1636 }
1637
1638 fn run(
1639 &self,
1640 engine_state: &EngineState,
1641 stack: &mut Stack,
1642 call: &Call,
1643 input: PipelineData,
1644 ) -> Result<PipelineData, ShellError> {
1645 use nu_engine::eval_block_with_early_return;
1646 use nu_parser::parse;
1647 use nu_protocol::{debugger::WithoutDebug, engine::StateWorkingSet, format_cli_error};
1648
1649 let script: String = call.req(engine_state, stack, 0)?;
1650
1651 let mut ws = StateWorkingSet::new(engine_state);
1652 let block = parse(&mut ws, Some("<.run>"), script.as_bytes(), false);
1653
1654 if !ws.parse_errors.is_empty() {
1655 let mut combined = String::new();
1656 for err in &ws.parse_errors {
1657 let se = ShellError::Generic(GenericError::new(
1658 "Parse error",
1659 format!("{err}"),
1660 err.span(),
1661 ));
1662 combined.push_str(&format_cli_error(None, &ws, &se, None));
1663 combined.push('\n');
1664 }
1665 let plain = nu_utils::strip_ansi_string_likely(combined);
1666 return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1667 }
1668 if !ws.compile_errors.is_empty() {
1669 let mut combined = String::new();
1670 for err in &ws.compile_errors {
1671 let se = ShellError::Generic(GenericError::new_internal(
1672 format!("Compile error {err}"),
1673 "",
1674 ));
1675 combined.push_str(&format_cli_error(None, &ws, &se, None));
1676 combined.push('\n');
1677 }
1678 let plain = nu_utils::strip_ansi_string_likely(combined);
1679 return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1680 }
1681
1682 let mut sandbox = engine_state.clone();
1683 sandbox.merge_delta(ws.render())?;
1684
1685 let mut sub_stack = Stack::new();
1686 eval_block_with_early_return::<WithoutDebug>(&sandbox, &mut sub_stack, &block, input)
1687 .map(|exec| exec.body)
1688 }
1689}
1690
1691#[derive(Clone)]
1694pub struct BusPubCommand {
1695 bus: Arc<Bus>,
1696}
1697
1698impl BusPubCommand {
1699 pub fn new(bus: Arc<Bus>) -> Self {
1700 Self { bus }
1701 }
1702}
1703
1704impl Command for BusPubCommand {
1705 fn name(&self) -> &str {
1706 ".bus pub"
1707 }
1708
1709 fn description(&self) -> &str {
1710 "Publish a value to an ephemeral in-process topic."
1711 }
1712
1713 fn signature(&self) -> Signature {
1714 Signature::build(".bus pub")
1715 .input_output_types(vec![(Type::Any, Type::Nothing)])
1716 .required("topic", SyntaxShape::String, "topic to publish to")
1717 .category(Category::Experimental)
1718 }
1719
1720 fn run(
1721 &self,
1722 engine_state: &EngineState,
1723 stack: &mut Stack,
1724 call: &Call,
1725 input: PipelineData,
1726 ) -> Result<PipelineData, ShellError> {
1727 let topic: String = call.req(engine_state, stack, 0)?;
1728 let value = input.into_value(call.head)?;
1729 self.bus.publish(topic, value);
1730 Ok(PipelineData::empty())
1731 }
1732}
1733
1734#[derive(Clone)]
1735pub struct BusSubCommand {
1736 bus: Arc<Bus>,
1737}
1738
1739impl BusSubCommand {
1740 pub fn new(bus: Arc<Bus>) -> Self {
1741 Self { bus }
1742 }
1743}
1744
1745impl Command for BusSubCommand {
1746 fn name(&self) -> &str {
1747 ".bus sub"
1748 }
1749
1750 fn description(&self) -> &str {
1751 "Subscribe to ephemeral in-process topics; yields {topic, value} records."
1752 }
1753
1754 fn extra_description(&self) -> &str {
1755 r#"With no argument, yields every published event. With a glob pattern (e.g. "tab-abc.*"),
1756yields only events whose topic matches. `*` matches any run of characters including dots."#
1757 }
1758
1759 fn signature(&self) -> Signature {
1760 Signature::build(".bus sub")
1761 .input_output_types(vec![(Type::Nothing, Type::Any)])
1762 .optional(
1763 "pattern",
1764 SyntaxShape::String,
1765 "glob pattern; omit for all topics",
1766 )
1767 .category(Category::Experimental)
1768 }
1769
1770 fn run(
1771 &self,
1772 engine_state: &EngineState,
1773 stack: &mut Stack,
1774 call: &Call,
1775 _input: PipelineData,
1776 ) -> Result<PipelineData, ShellError> {
1777 use nu_protocol::{record, ListStream, Signals};
1778
1779 let pattern: Option<String> = call.opt(engine_state, stack, 0)?;
1780 let span = call.head;
1781 let signals = engine_state.signals().clone();
1782
1783 let mut sub = self.bus.subscribe(pattern);
1784
1785 let (tx, rx) = std::sync::mpsc::channel::<crate::bus::BusEvent>();
1786 std::thread::spawn(move || {
1787 let rt = match tokio::runtime::Runtime::new() {
1788 Ok(rt) => rt,
1789 Err(_) => return,
1790 };
1791 rt.block_on(async move {
1792 while let Some(ev) = sub.recv().await {
1793 if tx.send(ev).is_err() {
1794 break;
1795 }
1796 }
1797 });
1798 });
1799
1800 let stream = ListStream::new(
1801 std::iter::from_fn(move || {
1802 use std::sync::mpsc::RecvTimeoutError;
1803 use std::time::Duration;
1804 loop {
1805 if signals.interrupted() {
1806 return None;
1807 }
1808 match rx.recv_timeout(Duration::from_millis(100)) {
1809 Ok(ev) => {
1810 let rec = record! {
1811 "topic" => Value::string(ev.topic, span),
1812 "value" => ev.value,
1813 };
1814 return Some(Value::record(rec, span));
1815 }
1816 Err(RecvTimeoutError::Timeout) => continue,
1817 Err(RecvTimeoutError::Disconnected) => return None,
1818 }
1819 }
1820 }),
1821 span,
1822 Signals::empty(),
1823 );
1824
1825 Ok(PipelineData::ListStream(stream, None))
1826 }
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831 use super::event_to_string;
1832 use nu_protocol::{record, Config, Value};
1833
1834 #[test]
1836 fn comment_only_emits_sse_comment() {
1837 let rec = Value::test_record(record! { "comment" => Value::test_string("hb") });
1838 assert_eq!(
1839 event_to_string(&Config::default(), rec).unwrap(),
1840 ": hb\n\n"
1841 );
1842 }
1843
1844 #[test]
1846 fn comment_precedes_data() {
1847 let rec = Value::test_record(record! {
1848 "comment" => Value::test_string("hb"),
1849 "data" => Value::test_string("x"),
1850 });
1851 assert_eq!(
1852 event_to_string(&Config::default(), rec).unwrap(),
1853 ": hb\ndata: x\n\n"
1854 );
1855 }
1856}