1use crate::bus::Bus;
2use crate::logging::log_print;
3use crate::response::{Response, ResponseBodyType};
4use nu_engine::command_prelude::*;
5use nu_protocol::{
6 shell_error::generic::GenericError, ByteStream, ByteStreamType, Category, Config, CustomValue,
7 PipelineData, PipelineMetadata, ShellError, Signature, Span, SyntaxShape, Type, Value,
8};
9use serde::{Deserialize, Serialize};
10use std::cell::RefCell;
11use std::collections::HashMap;
12use std::io::Read;
13use std::path::PathBuf;
14use tokio::sync::oneshot;
15
16use minijinja::{path_loader, AutoEscape, Environment};
17use std::sync::{Arc, OnceLock, RwLock};
18
19use syntect::html::{ClassStyle, ClassedHTMLGenerator};
20use syntect::parsing::SyntaxSet;
21use syntect::util::LinesWithEndings;
22
23type TemplateCache = RwLock<HashMap<u128, Arc<Environment<'static>>>>;
26
27static TEMPLATE_CACHE: OnceLock<TemplateCache> = OnceLock::new();
28
29fn get_cache() -> &'static TemplateCache {
30 TEMPLATE_CACHE.get_or_init(|| RwLock::new(HashMap::new()))
31}
32
33fn hash_source_and_path(source: &str, base_dir: &std::path::Path) -> u128 {
34 let mut data = source.as_bytes().to_vec();
35 data.extend_from_slice(base_dir.to_string_lossy().as_bytes());
36 xxhash_rust::xxh3::xxh3_128(&data)
37}
38
39fn compile_template(source: &str, base_dir: &std::path::Path) -> Result<u128, minijinja::Error> {
41 compile_template_with_loader(source, base_dir, path_loader(base_dir))
42}
43
44fn compile_template_with_loader<F>(
46 source: &str,
47 base_dir: &std::path::Path,
48 loader: F,
49) -> Result<u128, minijinja::Error>
50where
51 F: Fn(&str) -> Result<Option<String>, minijinja::Error> + Send + Sync + 'static,
52{
53 let hash = hash_source_and_path(source, base_dir);
54
55 let mut cache = get_cache().write().unwrap();
56 if cache.contains_key(&hash) {
57 return Ok(hash);
58 }
59
60 let mut env = Environment::new();
61 env.set_auto_escape_callback(|_| AutoEscape::Html);
62 env.set_loader(loader);
63 env.add_template_owned("template".to_string(), source.to_string())?;
64 cache.insert(hash, Arc::new(env));
65 Ok(hash)
66}
67
68fn get_compiled(hash: u128) -> Option<Arc<Environment<'static>>> {
70 get_cache().read().unwrap().get(&hash).map(Arc::clone)
71}
72
73#[cfg(feature = "cross-stream")]
75fn load_topic_content(store: &xs::store::Store, topic: &str) -> Option<String> {
76 let options = xs::store::ReadOptions::builder()
77 .follow(xs::store::FollowOption::Off)
78 .topic(topic.to_string())
79 .last(1_usize)
80 .build();
81 let frame = store.read_sync(options).last()?;
82 let hash = frame.hash?;
83 let bytes = store.cas_read_sync(&hash).ok()?;
84 String::from_utf8(bytes).ok()
85}
86
87#[derive(Clone, Debug, Serialize, Deserialize)]
90pub struct CompiledTemplate {
91 hash: u128,
92}
93
94impl CompiledTemplate {
95 pub fn render(&self, context: &minijinja::Value) -> Result<String, minijinja::Error> {
97 let env = get_compiled(self.hash).expect("template not in cache");
98 let tmpl = env.get_template("template")?;
99 tmpl.render(context)
100 }
101}
102
103#[typetag::serde]
104impl CustomValue for CompiledTemplate {
105 fn clone_value(&self, span: Span) -> Value {
106 Value::custom(Box::new(self.clone()), span)
107 }
108
109 fn type_name(&self) -> String {
110 "CompiledTemplate".into()
111 }
112
113 fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
114 Ok(Value::string(format!("{:032x}", self.hash), span))
115 }
116
117 fn as_any(&self) -> &dyn std::any::Any {
118 self
119 }
120
121 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
122 self
123 }
124}
125
126thread_local! {
127 pub static RESPONSE_TX: RefCell<Option<oneshot::Sender<Response>>> = const { RefCell::new(None) };
128}
129
130#[derive(Clone)]
131pub struct StaticCommand;
132
133impl Default for StaticCommand {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl StaticCommand {
140 pub fn new() -> Self {
141 Self
142 }
143}
144
145impl Command for StaticCommand {
146 fn name(&self) -> &str {
147 ".static"
148 }
149
150 fn description(&self) -> &str {
151 "Serve static files from a directory"
152 }
153
154 fn signature(&self) -> Signature {
155 Signature::build(".static")
156 .required("root", SyntaxShape::String, "root directory path")
157 .required("path", SyntaxShape::String, "request path")
158 .named(
159 "fallback",
160 SyntaxShape::String,
161 "fallback file when request missing",
162 None,
163 )
164 .input_output_types(vec![(Type::Nothing, Type::Nothing)])
165 .category(Category::Custom("http".into()))
166 }
167
168 fn run(
169 &self,
170 engine_state: &EngineState,
171 stack: &mut Stack,
172 call: &Call,
173 _input: PipelineData,
174 ) -> Result<PipelineData, ShellError> {
175 let root: String = call.req(engine_state, stack, 0)?;
176 let path: String = call.req(engine_state, stack, 1)?;
177
178 let fallback: Option<String> = call.get_flag(engine_state, stack, "fallback")?;
179
180 let response = Response {
181 status: 200,
182 headers: HashMap::new(),
183 body_type: ResponseBodyType::Static {
184 root: PathBuf::from(root),
185 path,
186 fallback,
187 },
188 };
189
190 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
191 if let Some(tx) = tx.borrow_mut().take() {
192 tx.send(response).map_err(|_| {
193 ShellError::Generic(GenericError::new(
194 "Failed to send response",
195 "Channel closed",
196 call.head,
197 ))
198 })?;
199 }
200 Ok(())
201 })?;
202
203 Ok(PipelineData::Empty)
204 }
205}
206
207const LINE_ENDING: &str = "\n";
208
209#[derive(Clone)]
210pub struct ToSse;
211
212impl Command for ToSse {
213 fn name(&self) -> &str {
214 "to sse"
215 }
216
217 fn signature(&self) -> Signature {
218 Signature::build("to sse")
219 .input_output_types(vec![
220 (Type::record(), Type::String),
221 (Type::List(Box::new(Type::record())), Type::String),
222 ])
223 .category(Category::Formats)
224 }
225
226 fn description(&self) -> &str {
227 "Convert records into text/event-stream format"
228 }
229
230 fn search_terms(&self) -> Vec<&str> {
231 vec!["sse", "server", "event"]
232 }
233
234 fn examples(&self) -> Vec<Example<'_>> {
235 vec![Example {
236 description: "Convert a record into a server-sent event",
237 example: "{data: 'hello'} | to sse",
238 result: Some(Value::test_string("data: hello\n\n")),
239 }]
240 }
241
242 fn run(
243 &self,
244 engine_state: &EngineState,
245 stack: &mut Stack,
246 call: &Call,
247 input: PipelineData,
248 ) -> Result<PipelineData, ShellError> {
249 let head = call.head;
250 let config = stack.get_config(engine_state);
251 match input {
252 PipelineData::ListStream(stream, meta) => {
253 let span = stream.span();
254 let cfg = config.clone();
255 let iter = stream
256 .into_iter()
257 .map(move |val| event_to_string(&cfg, val));
258 let stream = ByteStream::from_result_iter(
259 iter,
260 span,
261 engine_state.signals().clone(),
262 ByteStreamType::String,
263 );
264 Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
265 }
266 PipelineData::Value(Value::List { vals, .. }, meta) => {
267 let cfg = config.clone();
268 let iter = vals.into_iter().map(move |val| event_to_string(&cfg, val));
269 let span = head;
270 let stream = ByteStream::from_result_iter(
271 iter,
272 span,
273 engine_state.signals().clone(),
274 ByteStreamType::String,
275 );
276 Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
277 }
278 PipelineData::Value(val, meta) => {
279 let out = event_to_string(&config, val)?;
280 Ok(
281 Value::string(out, head)
282 .into_pipeline_data_with_metadata(update_metadata(meta)),
283 )
284 }
285 PipelineData::Empty => Ok(PipelineData::Value(
286 Value::string(String::new(), head),
287 update_metadata(None),
288 )),
289 PipelineData::ByteStream(..) => Err(ShellError::TypeMismatch {
290 err_message: "expected record input".into(),
291 span: head,
292 }),
293 }
294 }
295}
296
297fn emit_data_lines(out: &mut String, s: &str) {
298 for line in s.lines() {
299 out.push_str("data: ");
300 out.push_str(line);
301 out.push_str(LINE_ENDING);
302 }
303}
304
305#[allow(clippy::result_large_err)]
306fn value_to_data_string(val: &Value, config: &Config) -> Result<String, ShellError> {
307 match val {
308 Value::String { val, .. } => Ok(val.clone()),
309 _ => {
310 let json_value = value_to_json(val, config).map_err(|err| {
311 ShellError::Generic(GenericError::new(
312 err.to_string(),
313 "failed to serialize json",
314 Span::unknown(),
315 ))
316 })?;
317 serde_json::to_string(&json_value).map_err(|err| {
318 ShellError::Generic(GenericError::new(
319 err.to_string(),
320 "failed to serialize json",
321 Span::unknown(),
322 ))
323 })
324 }
325 }
326}
327
328#[allow(clippy::result_large_err)]
329fn event_to_string(config: &Config, val: Value) -> Result<String, ShellError> {
330 let span = val.span();
331 let rec = match val {
332 Value::Record { val, .. } => val,
333 Value::Error { error, .. } => return Err(*error),
335 other => {
336 return Err(ShellError::TypeMismatch {
337 err_message: format!("expected record, got {}", other.get_type()),
338 span,
339 })
340 }
341 };
342 let mut out = String::new();
343 if let Some(event) = rec.get("event") {
344 if !matches!(event, Value::Nothing { .. }) {
345 out.push_str("event: ");
346 out.push_str(&event.to_expanded_string("", config));
347 out.push_str(LINE_ENDING);
348 }
349 }
350 if let Some(id) = rec.get("id") {
351 if !matches!(id, Value::Nothing { .. }) {
352 out.push_str("id: ");
353 out.push_str(&id.to_expanded_string("", config));
354 out.push_str(LINE_ENDING);
355 }
356 }
357 if let Some(retry) = rec.get("retry") {
358 if !matches!(retry, Value::Nothing { .. }) {
359 out.push_str("retry: ");
360 out.push_str(&retry.to_expanded_string("", config));
361 out.push_str(LINE_ENDING);
362 }
363 }
364 if let Some(data) = rec.get("data") {
365 if !matches!(data, Value::Nothing { .. }) {
366 match data {
367 Value::List { vals, .. } => {
368 for item in vals {
369 emit_data_lines(&mut out, &value_to_data_string(item, config)?);
370 }
371 }
372 _ => {
373 emit_data_lines(&mut out, &value_to_data_string(data, config)?);
374 }
375 }
376 }
377 }
378 out.push_str(LINE_ENDING);
379 Ok(out)
380}
381
382fn value_to_json(val: &Value, config: &Config) -> serde_json::Result<serde_json::Value> {
383 Ok(match val {
384 Value::Bool { val, .. } => serde_json::Value::Bool(*val),
385 Value::Int { val, .. } => serde_json::Value::from(*val),
386 Value::Float { val, .. } => serde_json::Number::from_f64(*val)
387 .map(serde_json::Value::Number)
388 .unwrap_or(serde_json::Value::Null),
389 Value::String { val, .. } => serde_json::Value::String(val.clone()),
390 Value::List { vals, .. } => serde_json::Value::Array(
391 vals.iter()
392 .map(|v| value_to_json(v, config))
393 .collect::<Result<Vec<_>, _>>()?,
394 ),
395 Value::Record { val, .. } => {
396 let mut map = serde_json::Map::new();
397 for (k, v) in val.iter() {
398 map.insert(k.clone(), value_to_json(v, config)?);
399 }
400 serde_json::Value::Object(map)
401 }
402 Value::Nothing { .. } => serde_json::Value::Null,
403 other => serde_json::Value::String(other.to_expanded_string("", config)),
404 })
405}
406
407fn update_metadata(metadata: Option<PipelineMetadata>) -> Option<PipelineMetadata> {
408 metadata
409 .map(|md| md.with_content_type(Some("text/event-stream".into())))
410 .or_else(|| {
411 Some(PipelineMetadata::default().with_content_type(Some("text/event-stream".into())))
412 })
413}
414
415#[derive(Clone)]
416pub struct ReverseProxyCommand;
417
418impl Default for ReverseProxyCommand {
419 fn default() -> Self {
420 Self::new()
421 }
422}
423
424impl ReverseProxyCommand {
425 pub fn new() -> Self {
426 Self
427 }
428}
429
430impl Command for ReverseProxyCommand {
431 fn name(&self) -> &str {
432 ".reverse-proxy"
433 }
434
435 fn description(&self) -> &str {
436 "Forward HTTP requests to a backend server"
437 }
438
439 fn signature(&self) -> Signature {
440 Signature::build(".reverse-proxy")
441 .required("target_url", SyntaxShape::String, "backend URL to proxy to")
442 .optional(
443 "config",
444 SyntaxShape::Record(vec![]),
445 "optional configuration (headers, preserve_host, strip_prefix, query)",
446 )
447 .input_output_types(vec![(Type::Any, Type::Nothing)])
448 .category(Category::Custom("http".into()))
449 }
450
451 fn run(
452 &self,
453 engine_state: &EngineState,
454 stack: &mut Stack,
455 call: &Call,
456 input: PipelineData,
457 ) -> Result<PipelineData, ShellError> {
458 let target_url: String = call.req(engine_state, stack, 0)?;
459
460 let request_body = match input {
462 PipelineData::Empty => Vec::new(),
463 PipelineData::Value(value, _) => crate::response::value_to_bytes(value),
464 PipelineData::ByteStream(stream, _) => {
465 let mut body_bytes = Vec::new();
467 if let Some(mut reader) = stream.reader() {
468 loop {
469 let mut buffer = vec![0; 8192];
470 match reader.read(&mut buffer) {
471 Ok(0) => break, Ok(n) => {
473 buffer.truncate(n);
474 body_bytes.extend_from_slice(&buffer);
475 }
476 Err(_) => break,
477 }
478 }
479 }
480 body_bytes
481 }
482 PipelineData::ListStream(stream, _) => {
483 let items: Vec<_> = stream.into_iter().collect();
485 let json_value = serde_json::Value::Array(
486 items
487 .into_iter()
488 .map(|v| crate::response::value_to_json(&v))
489 .collect(),
490 );
491 serde_json::to_string(&json_value)
492 .unwrap_or_default()
493 .into_bytes()
494 }
495 };
496
497 let config = call.opt::<Value>(engine_state, stack, 1);
499
500 let mut headers = HashMap::new();
501 let mut preserve_host = true;
502 let mut strip_prefix: Option<String> = None;
503 let mut query: Option<HashMap<String, String>> = None;
504
505 if let Ok(Some(config_value)) = config {
506 if let Ok(record) = config_value.as_record() {
507 if let Some(headers_value) = record.get("headers") {
509 if let Ok(headers_record) = headers_value.as_record() {
510 for (k, v) in headers_record.iter() {
511 let header_value = match v {
512 Value::String { val, .. } => {
513 crate::response::HeaderValue::Single(val.clone())
514 }
515 Value::List { vals, .. } => {
516 let strings: Vec<String> = vals
517 .iter()
518 .filter_map(|v| v.as_str().ok())
519 .map(|s| s.to_string())
520 .collect();
521 crate::response::HeaderValue::Multiple(strings)
522 }
523 _ => continue, };
525 headers.insert(k.clone(), header_value);
526 }
527 }
528 }
529
530 if let Some(preserve_host_value) = record.get("preserve_host") {
532 if let Ok(ph) = preserve_host_value.as_bool() {
533 preserve_host = ph;
534 }
535 }
536
537 if let Some(strip_prefix_value) = record.get("strip_prefix") {
539 if let Ok(prefix) = strip_prefix_value.as_str() {
540 strip_prefix = Some(prefix.to_string());
541 }
542 }
543
544 if let Some(query_value) = record.get("query") {
546 if let Ok(query_record) = query_value.as_record() {
547 let mut query_map = HashMap::new();
548 for (k, v) in query_record.iter() {
549 if let Ok(v_str) = v.as_str() {
550 query_map.insert(k.clone(), v_str.to_string());
551 }
552 }
553 query = Some(query_map);
554 }
555 }
556 }
557 }
558
559 let response = Response {
560 status: 200,
561 headers: HashMap::new(),
562 body_type: ResponseBodyType::ReverseProxy {
563 target_url,
564 headers,
565 preserve_host,
566 strip_prefix,
567 request_body,
568 query,
569 },
570 };
571
572 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
573 if let Some(tx) = tx.borrow_mut().take() {
574 tx.send(response).map_err(|_| {
575 ShellError::Generic(GenericError::new(
576 "Failed to send response",
577 "Channel closed",
578 call.head,
579 ))
580 })?;
581 }
582 Ok(())
583 })?;
584
585 Ok(PipelineData::Empty)
586 }
587}
588
589#[derive(Clone)]
590pub struct MjCommand {
591 #[cfg(feature = "cross-stream")]
592 store: Option<xs::store::Store>,
593}
594
595impl Default for MjCommand {
596 fn default() -> Self {
597 Self::new()
598 }
599}
600
601impl MjCommand {
602 pub fn new() -> Self {
603 Self {
604 #[cfg(feature = "cross-stream")]
605 store: None,
606 }
607 }
608
609 #[cfg(feature = "cross-stream")]
610 pub fn with_store(store: xs::store::Store) -> Self {
611 Self { store: Some(store) }
612 }
613}
614
615impl Command for MjCommand {
616 fn name(&self) -> &str {
617 ".mj"
618 }
619
620 fn description(&self) -> &str {
621 "Render a minijinja template with context from input"
622 }
623
624 fn signature(&self) -> Signature {
625 Signature::build(".mj")
626 .optional("file", SyntaxShape::String, "template file path")
627 .named(
628 "inline",
629 SyntaxShape::String,
630 "inline template string",
631 Some('i'),
632 )
633 .named(
634 "topic",
635 SyntaxShape::String,
636 "load template from a store topic",
637 Some('t'),
638 )
639 .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
640 .category(Category::Custom("http".into()))
641 }
642
643 fn run(
644 &self,
645 engine_state: &EngineState,
646 stack: &mut Stack,
647 call: &Call,
648 input: PipelineData,
649 ) -> Result<PipelineData, ShellError> {
650 let head = call.head;
651 let file: Option<String> = call.opt(engine_state, stack, 0)?;
652 let inline: Option<String> = call.get_flag(engine_state, stack, "inline")?;
653 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
654
655 let mode_count = file.is_some() as u8 + inline.is_some() as u8 + topic.is_some() as u8;
656 if mode_count > 1 {
657 return Err(ShellError::Generic(GenericError::new(
658 "Cannot combine file, --inline, and --topic",
659 "use exactly one of: file path, --inline, or --topic",
660 head,
661 )));
662 }
663 if mode_count == 0 {
664 return Err(ShellError::Generic(GenericError::new(
665 "No template specified",
666 "provide a file path, --inline, or --topic",
667 head,
668 )));
669 }
670
671 let context = match input {
673 PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
674 PipelineData::Empty => minijinja::Value::from(()),
675 _ => {
676 return Err(ShellError::TypeMismatch {
677 err_message: "expected record input".into(),
678 span: head,
679 });
680 }
681 };
682
683 let mut env = Environment::new();
685 env.set_auto_escape_callback(|_| AutoEscape::Html);
686 let tmpl = if let Some(ref path) = file {
687 let path = std::path::Path::new(path);
689 let abs_path = if path.is_absolute() {
690 path.to_path_buf()
691 } else {
692 std::env::current_dir().unwrap_or_default().join(path)
693 };
694 if let Some(parent) = abs_path.parent() {
695 env.set_loader(path_loader(parent));
696 }
697 let name = abs_path
698 .file_name()
699 .and_then(|n| n.to_str())
700 .unwrap_or("template");
701 env.get_template(name).map_err(|e| {
702 ShellError::Generic(GenericError::new(
703 format!("Template error: {e}"),
704 e.to_string(),
705 head,
706 ))
707 })?
708 } else if let Some(ref topic_name) = topic {
709 #[cfg(feature = "cross-stream")]
711 {
712 let store = self.store.as_ref().ok_or_else(|| {
713 ShellError::Generic(GenericError::new(
714 "--topic requires --store",
715 "server must be started with --store to use --topic",
716 head,
717 ))
718 })?;
719 let source = load_topic_content(store, topic_name).ok_or_else(|| {
720 ShellError::Generic(GenericError::new(
721 format!("Topic not found: {topic_name}"),
722 "no content in store for this topic",
723 head,
724 ))
725 })?;
726 let topic_store = store.clone();
727 env.set_loader(move |name: &str| Ok(load_topic_content(&topic_store, name)));
728 env.add_template_owned(topic_name.clone(), source)
729 .map_err(|e| {
730 ShellError::Generic(GenericError::new(
731 format!("Template parse error: {e}"),
732 e.to_string(),
733 head,
734 ))
735 })?;
736 env.get_template(topic_name).map_err(|e| {
737 ShellError::Generic(GenericError::new(
738 format!("Template error: {e}"),
739 e.to_string(),
740 head,
741 ))
742 })?
743 }
744 #[cfg(not(feature = "cross-stream"))]
745 {
746 let _ = topic_name;
747 return Err(ShellError::Generic(GenericError::new(
748 "--topic requires cross-stream feature",
749 "built without store support",
750 head,
751 )));
752 }
753 } else {
754 let source = inline.unwrap();
756 env.add_template_owned("template".to_string(), source)
757 .map_err(|e| {
758 ShellError::Generic(GenericError::new(
759 format!("Template parse error: {e}"),
760 e.to_string(),
761 head,
762 ))
763 })?;
764 env.get_template("template").map_err(|e| {
765 ShellError::Generic(GenericError::new(
766 format!("Failed to get template: {e}"),
767 e.to_string(),
768 head,
769 ))
770 })?
771 };
772
773 let rendered = tmpl.render(&context).map_err(|e| {
774 ShellError::Generic(GenericError::new(
775 format!("Template render error: {e}"),
776 e.to_string(),
777 head,
778 ))
779 })?;
780
781 Ok(Value::string(rendered, head).into_pipeline_data())
782 }
783}
784
785fn nu_value_to_minijinja(val: &Value) -> minijinja::Value {
787 let json = value_to_json(val, &Config::default()).unwrap_or(serde_json::Value::Null);
788 minijinja::Value::from_serialize(&json)
789}
790
791#[derive(Clone)]
794pub struct MjCompileCommand {
795 #[cfg(feature = "cross-stream")]
796 store: Option<xs::store::Store>,
797}
798
799impl Default for MjCompileCommand {
800 fn default() -> Self {
801 Self::new()
802 }
803}
804
805impl MjCompileCommand {
806 pub fn new() -> Self {
807 Self {
808 #[cfg(feature = "cross-stream")]
809 store: None,
810 }
811 }
812
813 #[cfg(feature = "cross-stream")]
814 pub fn with_store(store: xs::store::Store) -> Self {
815 Self { store: Some(store) }
816 }
817}
818
819impl Command for MjCompileCommand {
820 fn name(&self) -> &str {
821 ".mj compile"
822 }
823
824 fn description(&self) -> &str {
825 "Compile a minijinja template, returning a reusable compiled template"
826 }
827
828 fn signature(&self) -> Signature {
829 Signature::build(".mj compile")
830 .optional("file", SyntaxShape::String, "template file path")
831 .named(
832 "inline",
833 SyntaxShape::Any,
834 "inline template (string or {__html: string})",
835 Some('i'),
836 )
837 .named(
838 "topic",
839 SyntaxShape::String,
840 "load template from a store topic",
841 Some('t'),
842 )
843 .input_output_types(vec![(
844 Type::Nothing,
845 Type::Custom("CompiledTemplate".into()),
846 )])
847 .category(Category::Custom("http".into()))
848 }
849
850 fn run(
851 &self,
852 engine_state: &EngineState,
853 stack: &mut Stack,
854 call: &Call,
855 _input: PipelineData,
856 ) -> Result<PipelineData, ShellError> {
857 let head = call.head;
858 let file: Option<String> = call.opt(engine_state, stack, 0)?;
859 let inline: Option<Value> = call.get_flag(engine_state, stack, "inline")?;
860 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
861
862 let inline_str: Option<String> = match &inline {
864 None => None,
865 Some(val) => match val {
866 Value::String { val, .. } => Some(val.clone()),
867 Value::Record { val, .. } => {
868 if let Some(html_val) = val.get("__html") {
869 match html_val {
870 Value::String { val, .. } => Some(val.clone()),
871 _ => {
872 return Err(ShellError::Generic(GenericError::new(
873 "__html must be a string",
874 "expected string value",
875 head,
876 )));
877 }
878 }
879 } else {
880 return Err(ShellError::Generic(GenericError::new(
881 "Record must have __html field",
882 "expected {__html: string}",
883 head,
884 )));
885 }
886 }
887 _ => {
888 return Err(ShellError::Generic(GenericError::new(
889 "--inline must be string or {__html: string}",
890 "invalid type",
891 head,
892 )));
893 }
894 },
895 };
896
897 let mode_count = file.is_some() as u8 + inline_str.is_some() as u8 + topic.is_some() as u8;
898 if mode_count > 1 {
899 return Err(ShellError::Generic(GenericError::new(
900 "Cannot combine file, --inline, and --topic",
901 "use exactly one of: file path, --inline, or --topic",
902 head,
903 )));
904 }
905 if mode_count == 0 {
906 return Err(ShellError::Generic(GenericError::new(
907 "No template specified",
908 "provide a file path, --inline, or --topic",
909 head,
910 )));
911 }
912
913 let hash = if let Some(ref path) = file {
914 let path = std::path::Path::new(path);
916 let abs_path = if path.is_absolute() {
917 path.to_path_buf()
918 } else {
919 std::env::current_dir().unwrap_or_default().join(path)
920 };
921 let base_dir = abs_path.parent().unwrap_or(&abs_path).to_path_buf();
922 let source = std::fs::read_to_string(&abs_path).map_err(|e| {
923 ShellError::Generic(GenericError::new(
924 format!("Failed to read template file: {e}"),
925 "could not read file",
926 head,
927 ))
928 })?;
929 compile_template(&source, &base_dir)
930 } else if let Some(ref topic_name) = topic {
931 #[cfg(feature = "cross-stream")]
933 {
934 let store = self.store.as_ref().ok_or_else(|| {
935 ShellError::Generic(GenericError::new(
936 "--topic requires --store",
937 "server must be started with --store to use --topic",
938 head,
939 ))
940 })?;
941 let source = load_topic_content(store, topic_name).ok_or_else(|| {
942 ShellError::Generic(GenericError::new(
943 format!("Topic not found: {topic_name}"),
944 "no content in store for this topic",
945 head,
946 ))
947 })?;
948 let topic_store = store.clone();
949 let base_dir = std::path::PathBuf::from(format!("__topic__/{topic_name}"));
951 compile_template_with_loader(&source, &base_dir, move |name: &str| {
952 Ok(load_topic_content(&topic_store, name))
953 })
954 }
955 #[cfg(not(feature = "cross-stream"))]
956 {
957 let _ = topic_name;
958 return Err(ShellError::Generic(GenericError::new(
959 "--topic requires cross-stream feature",
960 "built without store support",
961 head,
962 )));
963 }
964 } else {
965 let source = inline_str.unwrap();
967 let base_dir = std::path::PathBuf::from("__inline__");
968 compile_template_with_loader(&source, &base_dir, |_| Ok(None))
969 };
970
971 let hash = hash.map_err(|e| {
972 ShellError::Generic(GenericError::new(
973 format!("Template compile error: {e}"),
974 e.to_string(),
975 head,
976 ))
977 })?;
978
979 Ok(Value::custom(Box::new(CompiledTemplate { hash }), head).into_pipeline_data())
980 }
981}
982
983#[derive(Clone)]
986pub struct MjRenderCommand;
987
988impl Default for MjRenderCommand {
989 fn default() -> Self {
990 Self::new()
991 }
992}
993
994impl MjRenderCommand {
995 pub fn new() -> Self {
996 Self
997 }
998}
999
1000impl Command for MjRenderCommand {
1001 fn name(&self) -> &str {
1002 ".mj render"
1003 }
1004
1005 fn description(&self) -> &str {
1006 "Render a compiled minijinja template with context from input"
1007 }
1008
1009 fn signature(&self) -> Signature {
1010 Signature::build(".mj render")
1011 .required(
1012 "template",
1013 SyntaxShape::Any,
1014 "compiled template from '.mj compile'",
1015 )
1016 .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
1017 .category(Category::Custom("http".into()))
1018 }
1019
1020 fn run(
1021 &self,
1022 engine_state: &EngineState,
1023 stack: &mut Stack,
1024 call: &Call,
1025 input: PipelineData,
1026 ) -> Result<PipelineData, ShellError> {
1027 let head = call.head;
1028 let template_val: Value = call.req(engine_state, stack, 0)?;
1029
1030 let compiled = match template_val {
1032 Value::Custom { val, .. } => val
1033 .as_any()
1034 .downcast_ref::<CompiledTemplate>()
1035 .ok_or_else(|| ShellError::TypeMismatch {
1036 err_message: "expected CompiledTemplate".into(),
1037 span: head,
1038 })?
1039 .clone(),
1040 _ => {
1041 return Err(ShellError::TypeMismatch {
1042 err_message: "expected CompiledTemplate from '.mj compile'".into(),
1043 span: head,
1044 });
1045 }
1046 };
1047
1048 let context = match input {
1050 PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
1051 PipelineData::Empty => minijinja::Value::from(()),
1052 _ => {
1053 return Err(ShellError::TypeMismatch {
1054 err_message: "expected record input".into(),
1055 span: head,
1056 });
1057 }
1058 };
1059
1060 let rendered = compiled.render(&context).map_err(|e| {
1062 ShellError::Generic(GenericError::new(
1063 format!("Template render error: {e}"),
1064 e.to_string(),
1065 head,
1066 ))
1067 })?;
1068
1069 Ok(Value::string(rendered, head).into_pipeline_data())
1070 }
1071}
1072
1073struct SyntaxHighlighter {
1076 syntax_set: SyntaxSet,
1077}
1078
1079impl SyntaxHighlighter {
1080 fn new() -> Self {
1081 const SYNTAX_SET: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/syntax_set.bin"));
1082 let syntax_set = syntect::dumps::from_binary(SYNTAX_SET);
1083 Self { syntax_set }
1084 }
1085
1086 fn highlight(&self, code: &str, lang: Option<&str>) -> String {
1087 let syntax = match lang {
1088 Some(lang) => self
1089 .syntax_set
1090 .find_syntax_by_token(lang)
1091 .or_else(|| self.syntax_set.find_syntax_by_extension(lang)),
1092 None => None,
1093 }
1094 .unwrap_or_else(|| self.syntax_set.find_syntax_plain_text());
1095
1096 let mut html_generator = ClassedHTMLGenerator::new_with_class_style(
1097 syntax,
1098 &self.syntax_set,
1099 ClassStyle::Spaced,
1100 );
1101
1102 for line in LinesWithEndings::from(code) {
1103 let _ = html_generator.parse_html_for_line_which_includes_newline(line);
1104 }
1105
1106 html_generator.finalize()
1107 }
1108
1109 fn list_syntaxes(&self) -> Vec<(String, Vec<String>)> {
1110 self.syntax_set
1111 .syntaxes()
1112 .iter()
1113 .map(|s| (s.name.clone(), s.file_extensions.clone()))
1114 .collect()
1115 }
1116}
1117
1118static HIGHLIGHTER: OnceLock<SyntaxHighlighter> = OnceLock::new();
1119
1120fn get_highlighter() -> &'static SyntaxHighlighter {
1121 HIGHLIGHTER.get_or_init(SyntaxHighlighter::new)
1122}
1123
1124#[derive(Clone)]
1127pub struct HighlightCommand;
1128
1129impl Default for HighlightCommand {
1130 fn default() -> Self {
1131 Self::new()
1132 }
1133}
1134
1135impl HighlightCommand {
1136 pub fn new() -> Self {
1137 Self
1138 }
1139}
1140
1141impl Command for HighlightCommand {
1142 fn name(&self) -> &str {
1143 ".highlight"
1144 }
1145
1146 fn description(&self) -> &str {
1147 "Syntax highlight code, outputting HTML with CSS classes"
1148 }
1149
1150 fn signature(&self) -> Signature {
1151 Signature::build(".highlight")
1152 .required("lang", SyntaxShape::String, "language for highlighting")
1153 .input_output_types(vec![(Type::String, Type::record())])
1154 .category(Category::Custom("http".into()))
1155 }
1156
1157 fn run(
1158 &self,
1159 engine_state: &EngineState,
1160 stack: &mut Stack,
1161 call: &Call,
1162 input: PipelineData,
1163 ) -> Result<PipelineData, ShellError> {
1164 let head = call.head;
1165 let lang: String = call.req(engine_state, stack, 0)?;
1166
1167 let code = match input {
1168 PipelineData::Value(Value::String { val, .. }, _) => val,
1169 PipelineData::ByteStream(stream, _) => stream.into_string()?,
1170 _ => {
1171 return Err(ShellError::TypeMismatch {
1172 err_message: "expected string input".into(),
1173 span: head,
1174 });
1175 }
1176 };
1177
1178 let highlighter = get_highlighter();
1179 let html = highlighter.highlight(&code, Some(&lang));
1180
1181 Ok(Value::record(
1182 nu_protocol::record! {
1183 "__html" => Value::string(html, head),
1184 },
1185 head,
1186 )
1187 .into_pipeline_data())
1188 }
1189}
1190
1191#[derive(Clone)]
1194pub struct HighlightThemeCommand;
1195
1196impl Default for HighlightThemeCommand {
1197 fn default() -> Self {
1198 Self::new()
1199 }
1200}
1201
1202impl HighlightThemeCommand {
1203 pub fn new() -> Self {
1204 Self
1205 }
1206}
1207
1208impl Command for HighlightThemeCommand {
1209 fn name(&self) -> &str {
1210 ".highlight theme"
1211 }
1212
1213 fn description(&self) -> &str {
1214 "List available themes or get CSS for a specific theme"
1215 }
1216
1217 fn signature(&self) -> Signature {
1218 Signature::build(".highlight theme")
1219 .optional("name", SyntaxShape::String, "theme name (omit to list all)")
1220 .input_output_types(vec![
1221 (Type::Nothing, Type::List(Box::new(Type::String))),
1222 (Type::Nothing, Type::String),
1223 ])
1224 .category(Category::Custom("http".into()))
1225 }
1226
1227 fn run(
1228 &self,
1229 engine_state: &EngineState,
1230 stack: &mut Stack,
1231 call: &Call,
1232 _input: PipelineData,
1233 ) -> Result<PipelineData, ShellError> {
1234 let head = call.head;
1235 let name: Option<String> = call.opt(engine_state, stack, 0)?;
1236
1237 let assets = syntect_assets::assets::HighlightingAssets::from_binary();
1238
1239 match name {
1240 None => {
1241 let themes: Vec<Value> = assets.themes().map(|t| Value::string(t, head)).collect();
1242 Ok(Value::list(themes, head).into_pipeline_data())
1243 }
1244 Some(theme_name) => {
1245 let theme = assets.get_theme(&theme_name);
1246 let css = syntect::html::css_for_theme_with_class_style(theme, ClassStyle::Spaced)
1247 .map_err(|e| {
1248 ShellError::Generic(GenericError::new(
1249 format!("Failed to generate CSS: {e}"),
1250 e.to_string(),
1251 head,
1252 ))
1253 })?;
1254 Ok(Value::string(css, head).into_pipeline_data())
1255 }
1256 }
1257 }
1258}
1259
1260#[derive(Clone)]
1263pub struct HighlightLangCommand;
1264
1265impl Default for HighlightLangCommand {
1266 fn default() -> Self {
1267 Self::new()
1268 }
1269}
1270
1271impl HighlightLangCommand {
1272 pub fn new() -> Self {
1273 Self
1274 }
1275}
1276
1277impl Command for HighlightLangCommand {
1278 fn name(&self) -> &str {
1279 ".highlight lang"
1280 }
1281
1282 fn description(&self) -> &str {
1283 "List available languages for syntax highlighting"
1284 }
1285
1286 fn signature(&self) -> Signature {
1287 Signature::build(".highlight lang")
1288 .input_output_types(vec![(Type::Nothing, Type::List(Box::new(Type::record())))])
1289 .category(Category::Custom("http".into()))
1290 }
1291
1292 fn run(
1293 &self,
1294 _engine_state: &EngineState,
1295 _stack: &mut Stack,
1296 call: &Call,
1297 _input: PipelineData,
1298 ) -> Result<PipelineData, ShellError> {
1299 let head = call.head;
1300 let highlighter = get_highlighter();
1301 let langs: Vec<Value> = highlighter
1302 .list_syntaxes()
1303 .into_iter()
1304 .map(|(name, exts)| {
1305 Value::record(
1306 nu_protocol::record! {
1307 "name" => Value::string(name, head),
1308 "extensions" => Value::list(
1309 exts.into_iter().map(|e| Value::string(e, head)).collect(),
1310 head
1311 ),
1312 },
1313 head,
1314 )
1315 })
1316 .collect();
1317 Ok(Value::list(langs, head).into_pipeline_data())
1318 }
1319}
1320
1321use pulldown_cmark::{html, CodeBlockKind, Event, Parser as MarkdownParser, Tag, TagEnd};
1324
1325#[derive(Clone)]
1326pub struct MdCommand;
1327
1328impl Default for MdCommand {
1329 fn default() -> Self {
1330 Self::new()
1331 }
1332}
1333
1334impl MdCommand {
1335 pub fn new() -> Self {
1336 Self
1337 }
1338}
1339
1340impl Command for MdCommand {
1341 fn name(&self) -> &str {
1342 ".md"
1343 }
1344
1345 fn description(&self) -> &str {
1346 "Convert Markdown to HTML with syntax-highlighted code blocks"
1347 }
1348
1349 fn signature(&self) -> Signature {
1350 Signature::build(".md")
1351 .input_output_types(vec![
1352 (Type::String, Type::record()),
1353 (Type::record(), Type::record()),
1354 ])
1355 .category(Category::Custom("http".into()))
1356 }
1357
1358 fn run(
1359 &self,
1360 _engine_state: &EngineState,
1361 _stack: &mut Stack,
1362 call: &Call,
1363 input: PipelineData,
1364 ) -> Result<PipelineData, ShellError> {
1365 let head = call.head;
1366
1367 let (markdown, trusted) = match input.into_value(head)? {
1369 Value::String { val, .. } => (val, false),
1370 Value::Record { val, .. } => {
1371 if let Some(html_val) = val.get("__html") {
1372 (html_val.as_str()?.to_string(), true)
1373 } else {
1374 return Err(ShellError::TypeMismatch {
1375 err_message: "expected string or {__html: ...}".into(),
1376 span: head,
1377 });
1378 }
1379 }
1380 other => {
1381 return Err(ShellError::TypeMismatch {
1382 err_message: format!(
1383 "expected string or {{__html: ...}}, got {}",
1384 other.get_type()
1385 ),
1386 span: head,
1387 });
1388 }
1389 };
1390
1391 let highlighter = get_highlighter();
1392
1393 let mut in_code_block = false;
1394 let mut current_code = String::new();
1395 let mut current_lang: Option<String> = None;
1396
1397 let mut options = pulldown_cmark::Options::empty();
1398 options.insert(pulldown_cmark::Options::ENABLE_TABLES);
1399 options.insert(pulldown_cmark::Options::ENABLE_STRIKETHROUGH);
1400 options.insert(pulldown_cmark::Options::ENABLE_TASKLISTS);
1401 options.insert(pulldown_cmark::Options::ENABLE_FOOTNOTES);
1402 options.insert(pulldown_cmark::Options::ENABLE_HEADING_ATTRIBUTES);
1403 options.insert(pulldown_cmark::Options::ENABLE_GFM);
1404 options.insert(pulldown_cmark::Options::ENABLE_DEFINITION_LIST);
1405 let parser = MarkdownParser::new_ext(&markdown, options).map(|event| match event {
1406 Event::Start(Tag::CodeBlock(kind)) => {
1407 in_code_block = true;
1408 current_code.clear();
1409 current_lang = match kind {
1410 CodeBlockKind::Fenced(info) => {
1411 let lang = info.split_whitespace().next().unwrap_or("");
1412 if lang.is_empty() {
1413 None
1414 } else {
1415 Some(lang.to_string())
1416 }
1417 }
1418 CodeBlockKind::Indented => None,
1419 };
1420 Event::Text("".into())
1421 }
1422 Event::End(TagEnd::CodeBlock) => {
1423 in_code_block = false;
1424 let highlighted = highlighter.highlight(¤t_code, current_lang.as_deref());
1425 let mut html_out = String::new();
1426 html_out.push_str("<pre><code");
1427 if let Some(lang) = ¤t_lang {
1428 let lang = v_htmlescape::escape(lang);
1429 html_out.push_str(&format!(" class=\"language-{lang}\""));
1430 }
1431 html_out.push('>');
1432 html_out.push_str(&highlighted);
1433 html_out.push_str("</code></pre>");
1434 Event::Html(html_out.into())
1435 }
1436 Event::Text(text) => {
1437 if in_code_block {
1438 current_code.push_str(&text);
1439 Event::Text("".into())
1440 } else {
1441 Event::Text(text)
1442 }
1443 }
1444 Event::Html(html) => {
1446 if trusted {
1447 Event::Html(html)
1448 } else {
1449 Event::Text(html) }
1451 }
1452 Event::InlineHtml(html) => {
1453 if trusted {
1454 Event::InlineHtml(html)
1455 } else {
1456 Event::Text(html)
1457 }
1458 }
1459 e => e,
1460 });
1461
1462 let mut html_output = String::new();
1463 html::push_html(&mut html_output, parser);
1464
1465 Ok(Value::record(
1466 nu_protocol::record! {
1467 "__html" => Value::string(html_output, head),
1468 },
1469 head,
1470 )
1471 .into_pipeline_data())
1472 }
1473}
1474
1475#[derive(Clone)]
1478pub struct PrintCommand;
1479
1480impl Default for PrintCommand {
1481 fn default() -> Self {
1482 Self::new()
1483 }
1484}
1485
1486impl PrintCommand {
1487 pub fn new() -> Self {
1488 Self
1489 }
1490}
1491
1492impl Command for PrintCommand {
1493 fn name(&self) -> &str {
1494 "print"
1495 }
1496
1497 fn description(&self) -> &str {
1498 "Print the given values to the http-nu logging system."
1499 }
1500
1501 fn extra_description(&self) -> &str {
1502 r#"This command outputs to http-nu's logging system rather than stdout/stderr.
1503Messages appear in both human-readable and JSONL output modes.
1504
1505`print` may be used inside blocks of code (e.g.: hooks) to display text during execution without interfering with the pipeline."#
1506 }
1507
1508 fn search_terms(&self) -> Vec<&str> {
1509 vec!["display"]
1510 }
1511
1512 fn signature(&self) -> Signature {
1513 Signature::build("print")
1514 .input_output_types(vec![
1515 (Type::Nothing, Type::Nothing),
1516 (Type::Any, Type::Nothing),
1517 ])
1518 .allow_variants_without_examples(true)
1519 .rest("rest", SyntaxShape::Any, "the values to print")
1520 .switch(
1521 "no-newline",
1522 "print without inserting a newline for the line ending",
1523 Some('n'),
1524 )
1525 .switch("stderr", "print to stderr instead of stdout", Some('e'))
1526 .switch(
1527 "raw",
1528 "print without formatting (including binary data)",
1529 Some('r'),
1530 )
1531 .category(Category::Strings)
1532 }
1533
1534 fn run(
1535 &self,
1536 engine_state: &EngineState,
1537 stack: &mut Stack,
1538 call: &Call,
1539 input: PipelineData,
1540 ) -> Result<PipelineData, ShellError> {
1541 let args: Vec<Value> = call.rest(engine_state, stack, 0)?;
1542 let no_newline = call.has_flag(engine_state, stack, "no-newline")?;
1543 let config = stack.get_config(engine_state);
1544
1545 let format_value = |val: &Value| -> String { val.to_expanded_string(" ", &config) };
1546
1547 if !args.is_empty() {
1548 let messages: Vec<String> = args.iter().map(format_value).collect();
1549 let message = if no_newline {
1550 messages.join("")
1551 } else {
1552 messages.join("\n")
1553 };
1554 log_print(&message);
1555 } else if !input.is_nothing() {
1556 let message = match input {
1557 PipelineData::Value(val, _) => format_value(&val),
1558 PipelineData::ListStream(stream, _) => {
1559 let vals: Vec<String> = stream.into_iter().map(|v| format_value(&v)).collect();
1560 vals.join("\n")
1561 }
1562 PipelineData::ByteStream(stream, _) => stream.into_string()?,
1563 PipelineData::Empty => String::new(),
1564 };
1565 if !message.is_empty() {
1566 log_print(&message);
1567 }
1568 }
1569
1570 Ok(PipelineData::empty())
1571 }
1572
1573 fn examples(&self) -> Vec<Example<'_>> {
1574 vec![
1575 Example {
1576 description: "Print 'hello world'",
1577 example: r#"print "hello world""#,
1578 result: None,
1579 },
1580 Example {
1581 description: "Print the sum of 2 and 3",
1582 example: r#"print (2 + 3)"#,
1583 result: None,
1584 },
1585 ]
1586 }
1587}
1588
1589#[derive(Clone)]
1592pub struct RunNuCommand;
1593
1594impl Default for RunNuCommand {
1595 fn default() -> Self {
1596 Self::new()
1597 }
1598}
1599
1600impl RunNuCommand {
1601 pub fn new() -> Self {
1602 Self
1603 }
1604}
1605
1606impl Command for RunNuCommand {
1607 fn name(&self) -> &str {
1608 ".run"
1609 }
1610
1611 fn description(&self) -> &str {
1612 "Parse, compile, and evaluate a nushell pipeline string in a sandboxed engine state."
1613 }
1614
1615 fn extra_description(&self) -> &str {
1616 r#"The submitted script is parsed and compiled against a clone of the current engine state.
1617Any defs, lets, or modules introduced by the script live only for the duration of the call --
1618they do not leak back into the calling engine. Pipeline input is forwarded to the script as `$in`;
1619the caller's local bindings and environment are not visible inside the sandbox."#
1620 }
1621
1622 fn signature(&self) -> Signature {
1623 Signature::build(".run")
1624 .input_output_types(vec![(Type::Any, Type::Any)])
1625 .required("script", SyntaxShape::String, "nushell source to evaluate")
1626 .category(Category::Experimental)
1627 }
1628
1629 fn run(
1630 &self,
1631 engine_state: &EngineState,
1632 stack: &mut Stack,
1633 call: &Call,
1634 input: PipelineData,
1635 ) -> Result<PipelineData, ShellError> {
1636 use nu_engine::eval_block_with_early_return;
1637 use nu_parser::parse;
1638 use nu_protocol::{debugger::WithoutDebug, engine::StateWorkingSet, format_cli_error};
1639
1640 let script: String = call.req(engine_state, stack, 0)?;
1641
1642 let mut ws = StateWorkingSet::new(engine_state);
1643 let block = parse(&mut ws, Some("<.run>"), script.as_bytes(), false);
1644
1645 if !ws.parse_errors.is_empty() {
1646 let mut combined = String::new();
1647 for err in &ws.parse_errors {
1648 let se = ShellError::Generic(GenericError::new(
1649 "Parse error",
1650 format!("{err}"),
1651 err.span(),
1652 ));
1653 combined.push_str(&format_cli_error(None, &ws, &se, None));
1654 combined.push('\n');
1655 }
1656 let plain = nu_utils::strip_ansi_string_likely(combined);
1657 return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1658 }
1659 if !ws.compile_errors.is_empty() {
1660 let mut combined = String::new();
1661 for err in &ws.compile_errors {
1662 let se = ShellError::Generic(GenericError::new_internal(
1663 format!("Compile error {err}"),
1664 "",
1665 ));
1666 combined.push_str(&format_cli_error(None, &ws, &se, None));
1667 combined.push('\n');
1668 }
1669 let plain = nu_utils::strip_ansi_string_likely(combined);
1670 return Err(ShellError::Generic(GenericError::new_internal(plain, "")));
1671 }
1672
1673 let mut sandbox = engine_state.clone();
1674 sandbox.merge_delta(ws.render())?;
1675
1676 let mut sub_stack = Stack::new();
1677 eval_block_with_early_return::<WithoutDebug>(&sandbox, &mut sub_stack, &block, input)
1678 .map(|exec| exec.body)
1679 }
1680}
1681
1682#[derive(Clone)]
1685pub struct BusPubCommand {
1686 bus: Arc<Bus>,
1687}
1688
1689impl BusPubCommand {
1690 pub fn new(bus: Arc<Bus>) -> Self {
1691 Self { bus }
1692 }
1693}
1694
1695impl Command for BusPubCommand {
1696 fn name(&self) -> &str {
1697 ".bus pub"
1698 }
1699
1700 fn description(&self) -> &str {
1701 "Publish a value to an ephemeral in-process topic."
1702 }
1703
1704 fn signature(&self) -> Signature {
1705 Signature::build(".bus pub")
1706 .input_output_types(vec![(Type::Any, Type::Nothing)])
1707 .required("topic", SyntaxShape::String, "topic to publish to")
1708 .category(Category::Experimental)
1709 }
1710
1711 fn run(
1712 &self,
1713 engine_state: &EngineState,
1714 stack: &mut Stack,
1715 call: &Call,
1716 input: PipelineData,
1717 ) -> Result<PipelineData, ShellError> {
1718 let topic: String = call.req(engine_state, stack, 0)?;
1719 let value = input.into_value(call.head)?;
1720 self.bus.publish(topic, value);
1721 Ok(PipelineData::empty())
1722 }
1723}
1724
1725#[derive(Clone)]
1726pub struct BusSubCommand {
1727 bus: Arc<Bus>,
1728}
1729
1730impl BusSubCommand {
1731 pub fn new(bus: Arc<Bus>) -> Self {
1732 Self { bus }
1733 }
1734}
1735
1736impl Command for BusSubCommand {
1737 fn name(&self) -> &str {
1738 ".bus sub"
1739 }
1740
1741 fn description(&self) -> &str {
1742 "Subscribe to ephemeral in-process topics; yields {topic, value} records."
1743 }
1744
1745 fn extra_description(&self) -> &str {
1746 r#"With no argument, yields every published event. With a glob pattern (e.g. "tab-abc.*"),
1747yields only events whose topic matches. `*` matches any run of characters including dots."#
1748 }
1749
1750 fn signature(&self) -> Signature {
1751 Signature::build(".bus sub")
1752 .input_output_types(vec![(Type::Nothing, Type::Any)])
1753 .optional(
1754 "pattern",
1755 SyntaxShape::String,
1756 "glob pattern; omit for all topics",
1757 )
1758 .category(Category::Experimental)
1759 }
1760
1761 fn run(
1762 &self,
1763 engine_state: &EngineState,
1764 stack: &mut Stack,
1765 call: &Call,
1766 _input: PipelineData,
1767 ) -> Result<PipelineData, ShellError> {
1768 use nu_protocol::{record, ListStream, Signals};
1769
1770 let pattern: Option<String> = call.opt(engine_state, stack, 0)?;
1771 let span = call.head;
1772 let signals = engine_state.signals().clone();
1773
1774 let mut sub = self.bus.subscribe(pattern);
1775
1776 let (tx, rx) = std::sync::mpsc::channel::<crate::bus::BusEvent>();
1777 std::thread::spawn(move || {
1778 let rt = match tokio::runtime::Runtime::new() {
1779 Ok(rt) => rt,
1780 Err(_) => return,
1781 };
1782 rt.block_on(async move {
1783 while let Some(ev) = sub.recv().await {
1784 if tx.send(ev).is_err() {
1785 break;
1786 }
1787 }
1788 });
1789 });
1790
1791 let stream = ListStream::new(
1792 std::iter::from_fn(move || {
1793 use std::sync::mpsc::RecvTimeoutError;
1794 use std::time::Duration;
1795 loop {
1796 if signals.interrupted() {
1797 return None;
1798 }
1799 match rx.recv_timeout(Duration::from_millis(100)) {
1800 Ok(ev) => {
1801 let rec = record! {
1802 "topic" => Value::string(ev.topic, span),
1803 "value" => ev.value,
1804 };
1805 return Some(Value::record(rec, span));
1806 }
1807 Err(RecvTimeoutError::Timeout) => continue,
1808 Err(RecvTimeoutError::Disconnected) => return None,
1809 }
1810 }
1811 }),
1812 span,
1813 Signals::empty(),
1814 );
1815
1816 Ok(PipelineData::ListStream(stream, None))
1817 }
1818}